未验证 提交 405c4c63 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #14991 from taosdata/feature/3_liaohj

fix(query):support multigroup in groupby operator.
...@@ -115,9 +115,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int ...@@ -115,9 +115,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
p->groupId = *(uint64_t*)key; p->groupId = *(uint64_t*)key;
p->pos = *(SResultRowPosition*)pData; p->pos = *(SResultRowPosition*)pData;
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
#ifdef BUF_PAGE_DEBUG
qDebug("page_groupRes, groupId:%" PRIu64 ",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
#endif
taosArrayPush(pGroupResInfo->pRows, &p); taosArrayPush(pGroupResInfo->pRows, &p);
} }
......
...@@ -1506,15 +1506,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI ...@@ -1506,15 +1506,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
int32_t numOfExprs) { int32_t numOfExprs) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t start = pGroupResInfo->index; int32_t start = pGroupResInfo->index;
#ifdef BUF_PAGE_DEBUG
qDebug("\npage_copytoblock rows:%d", numOfRows);
#endif
for (int32_t i = start; i < numOfRows; i += 1) { for (int32_t i = start; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
#ifdef BUF_PAGE_DEBUG
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
#endif
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
...@@ -2222,6 +2218,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2222,6 +2218,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
if (completed == totalSources) { if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs); return setAllSourcesCompleted(pOperator, startTs);
} }
sched_yield();
} }
_error: _error:
...@@ -3748,7 +3746,7 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -3748,7 +3746,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo); taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) { if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult); pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
} }
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
...@@ -264,7 +264,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -264,7 +264,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, 0, pInfo->aggSup.pResultBuf, &pInfo->aggSup); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -282,7 +282,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -282,7 +282,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = int32_t ret =
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
0, pInfo->aggSup.pResultBuf, &pInfo->aggSup); pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -293,6 +293,29 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -293,6 +293,29 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
} }
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -304,22 +327,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -304,22 +327,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
while(1) { return buildGroupResultDataBlock(pOperator);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
...@@ -373,26 +381,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -373,26 +381,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return buildGroupResultDataBlock(pOperator);
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pRes;
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -800,7 +789,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -800,7 +789,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
} }
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) { uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
......
...@@ -234,9 +234,9 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -234,9 +234,9 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -539,14 +539,12 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { ...@@ -539,14 +539,12 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
} }
pInfo->startTs = taosGetTimestampUs(); pInfo->startTs = taosGetTimestampUs();
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str); pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort); tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
...@@ -556,7 +554,6 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { ...@@ -556,7 +554,6 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
} }
int32_t code = tsortOpen(pInfo->pSortHandle); int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno); longjmp(pTaskInfo->env, terrno);
} }
...@@ -674,6 +671,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -674,6 +671,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
......
...@@ -109,6 +109,10 @@ static int32_t sortComparClearup(SMsortComparParam* cmpParam) { ...@@ -109,6 +109,10 @@ static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
} }
void tsortDestroySortHandle(SSortHandle* pSortHandle) { void tsortDestroySortHandle(SSortHandle* pSortHandle) {
if (pSortHandle == NULL) {
return;
}
tsortClose(pSortHandle); tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) { if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(pSortHandle->pMergeTree); tMergeTreeDestroy(pSortHandle->pMergeTree);
...@@ -119,7 +123,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { ...@@ -119,7 +123,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
blockDataDestroy(pSortHandle->pDataBlock); blockDataDestroy(pSortHandle->pDataBlock);
for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){ for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i); SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
blockDataDestroy((*pSource)->src.pBlock);
taosMemoryFreeClear(*pSource); taosMemoryFreeClear(*pSource);
} }
taosArrayDestroy(pSortHandle->pOrderedSource); taosArrayDestroy(pSortHandle->pOrderedSource);
......
...@@ -426,7 +426,7 @@ class TDTestCase: ...@@ -426,7 +426,7 @@ class TDTestCase:
tdSql.query(" select unique(t1+c1) from stb1 ") tdSql.query(" select unique(t1+c1) from stb1 ")
tdSql.checkRows(13) tdSql.checkRows(13)
tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ") tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ")
tdSql.checkRows(13) tdSql.checkRows(20)
tdSql.query(" select unique(t1) from stb1 partition by tbname ") tdSql.query(" select unique(t1) from stb1 partition by tbname ")
tdSql.checkRows(2) tdSql.checkRows(2)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册