提交 9800c8d5 编写于 作者: H Haojun Liao

[td-13043]refactor operator of multitable aggregate.

上级 896b269f
......@@ -126,6 +126,13 @@ static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFile
// return ((char *)page->data) + rowOffset + offset * numOfRows;
}
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
assert(rowOffset >= 0);
int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......
......@@ -99,10 +99,7 @@ typedef struct SSingleColumnFilterInfo {
typedef struct STableQueryInfo {
TSKEY lastKey;
int32_t groupIndex; // group id in table list
SVariant tag;
// STimeWindow win; // todo remove it later
// STSCursor cur;
// void* pTable; // for retrieve the page id list
// SVariant tag;
SResultRowInfo resInfo;
} STableQueryInfo;
......@@ -442,6 +439,7 @@ typedef struct SOptrBasicInfo {
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
uint32_t resRowSize;
} SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
......@@ -449,13 +447,14 @@ typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
uint32_t seed;
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
STableQueryInfo* current;
STableQueryInfo *current;
uint32_t groupId;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
......
......@@ -226,10 +226,9 @@ static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo)
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SqlFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo);
SArray* getOrderCheckColumns(STaskAttr* pQuery);
SArray* getOrderCheckColumns(STaskAttr* pQuery);
typedef struct SRowCompSupporter {
......@@ -3642,7 +3641,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
return;
}
taosVariantDestroy(&pTableQueryInfo->tag);
// taosVariantDestroy(&pTableQueryInfo->tag);
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
......@@ -3679,14 +3678,49 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pRes
}
}
void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SqlFunctionCtx* pCtx,
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) {
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getBufPage(pBuf, pResult->pageId);
int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
offset += pCtx[i].resDataInfo.bytes;
continue;
}
pCtx[i].pOutput = getPosInResultPage_rv(bufPage, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId < 0) {
continue;
}
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
}
// if (!pResInfo->initialized) {
// aAggs[functionId].init(&pCtx[i], pResInfo);
// }
}
}
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) {
// for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0;
int64_t tid = 0;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SResultRow* pResultRow =
doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, pAggInfo);
assert (pResultRow != NULL);
/*
......@@ -3694,29 +3728,26 @@ void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRes
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, tableGroupId, pRuntimeEnv->pQueryAttr->resultRowSize);
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.resRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return;
}
}
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowOutputBufInitCtx_rv(pAggInfo->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
}
void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo, STableQueryInfo *pTableQueryInfo, SAggOperatorInfo* pAggInfo) {
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == tableGroupId) {
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == tableGroupId) {
return;
}
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, tableGroupId);
doSetTableGroupOutputBuf(pAggInfo, numOfOutput, tableGroupId, pTaskInfo);
// record the current active group id
pRuntimeEnv->prevGroupId = tableGroupId;
pAggInfo->groupId = tableGroupId;
}
void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx,
......@@ -4013,14 +4044,12 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime
}
}
static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// update the number of result for each, only update the number of rows for the corresponding window result.
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
return;
}
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return;
// }
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
......@@ -4031,8 +4060,8 @@ static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SqlFunctio
continue;
}
// SResultRowEntryInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset);
// pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
SResultRowEntryInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset);
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
}
}
}
......@@ -6163,21 +6192,20 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->pRes;
}
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
// table scan order
int32_t order = TSDB_ORDER_ASC;//pQueryAttr->order.order;
SOperatorInfo* downstream = pOperator->pDownstream[0];
......@@ -6191,7 +6219,6 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// if (downstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = downstream->info;
// order = getTableScanOrder(pScanInfo);
......@@ -6201,7 +6228,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY key = 0;
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
if (order == TSDB_ORDER_ASC) {
key = pBlock->info.window.ekey;
TSKEY_MAX_ADD(key, 1);
} else {
......@@ -6209,20 +6236,18 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
TSKEY_MIN_SUB(key, -1);
}
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, pAggInfo);
doAggregateImpl(pOperator, 0, pInfo->pCtx, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->resultRowInfo);
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo,
pInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo);
// initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
doSetOperatorCompleted(pOperator);
}
......@@ -6878,7 +6903,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
} else {
updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
}
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册