提交 902fcb0c 编写于 作者: H Haojun Liao

refactor(query): refactor code.

上级 30602b8c
...@@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor ...@@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window SResultRowPosition pos; // current active time window
// uint64_t uid; // table uid
// int32_t groupIndex; // group id in table list // int32_t groupIndex; // group id in table list
// SVariant tag; // SVariant tag;
// SResultRowInfo resInfo; // result info // SResultRowInfo resInfo; // result info
...@@ -433,10 +432,9 @@ typedef struct STableIntervalOperatorInfo { ...@@ -433,10 +432,9 @@ typedef struct STableIntervalOperatorInfo {
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SAggSupporter aggSup; SAggSupporter aggSup;
STableQueryInfo *current; STableQueryInfo *current;
uint32_t groupId; uint64_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo; STableQueryInfo *pTableQueryInfo;
...@@ -633,7 +631,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -633,7 +631,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
......
...@@ -114,7 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { ...@@ -114,7 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// ASSERT(0);
// SResultRow* pRow = pResultRowInfo->pResult[i]; // SResultRow* pRow = pResultRowInfo->pResult[i];
// if (pRow->closed) { // if (pRow->closed) {
// continue; // continue;
......
...@@ -242,8 +242,7 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, ...@@ -242,8 +242,7 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision,
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SArray* getOrderCheckColumns(STaskAttr* pQuery); SArray* getOrderCheckColumns(STaskAttr* pQuery);
...@@ -511,8 +510,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR ...@@ -511,8 +510,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
// pResultRowInfo->curPos = pResultRowInfo->size; // pResultRowInfo->curPos = pResultRowInfo->size;
pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// int64_t index = pResultRowInfo->curPos;
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur, POINTER_BYTES); taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur, POINTER_BYTES);
} else { } else {
...@@ -2887,7 +2884,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD ...@@ -2887,7 +2884,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { if (!isRowEntryInitialized(pResInfo)) {
continue; continue;
} }
...@@ -3051,18 +3048,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -3051,18 +3048,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock);
} }
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0; int64_t uid = 0;
int64_t tid = 0;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SResultRow* pResultRow = SResultRow* pResultRow =
doSetResultOutBufByKey_rv(pAggInfo->pResultBuf, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), doSetResultOutBufByKey_rv(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
true, uid, pTaskInfo, false, &pAggInfo->aggSup); true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
assert(pResultRow != NULL); assert(pResultRow != NULL);
/* /*
...@@ -3070,7 +3066,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i ...@@ -3070,7 +3066,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize); int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return; return;
} }
...@@ -3079,18 +3075,15 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i ...@@ -3079,18 +3075,15 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
} }
void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo, void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) {
STableQueryInfo* pTableQueryInfo, SAggOperatorInfo* pAggInfo) { if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == tableGroupId) {
return; return;
} }
doSetTableGroupOutputBuf(pAggInfo, numOfOutput, tableGroupId, pTaskInfo); doSetTableGroupOutputBuf(pAggInfo, numOfOutput, groupId, pTaskInfo);
// record the current active group id // record the current active group id
pAggInfo->groupId = tableGroupId; pAggInfo->groupId = groupId;
} }
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
...@@ -4841,7 +4834,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -4841,7 +4834,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOptrBasicInfo* pInfo = &pAggInfo->binfo;
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
...@@ -4866,6 +4861,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -4866,6 +4861,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, 0, pInfo->pCtx); doAggregateImpl(pOperator, 0, pInfo->pCtx);
...@@ -4885,8 +4881,11 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -4885,8 +4881,11 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
#endif #endif
} }
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf,
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pAggInfo->groupResInfo, &pAggInfo->binfo.resultRowInfo);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4905,9 +4904,13 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) ...@@ -4905,9 +4904,13 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
return NULL; return NULL;
} }
getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doSetOperatorCompleted(pOperator); toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
doSetOperatorCompleted(pOperator);
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
} }
...@@ -5021,74 +5024,6 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi ...@@ -5021,74 +5024,6 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi
return true; return true;
} }
static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->pRes;
}
// table scan order
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// if (downstream->operatorType == OP_TableScan) {
// STableScanInfo* pScanInfo = downstream->info;
// order = getTableScanOrder(pScanInfo);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY key = 0;
if (order == TSDB_ORDER_ASC) {
key = pBlock->info.window.ekey;
TSKEY_MAX_ADD(key, 1);
} else {
key = pBlock->info.window.skey;
TSKEY_MIN_SUB(key, -1);
}
// setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current,
// pAggInfo);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->resultRowInfo);
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pInfo->pRes;
}
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo; SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
...@@ -5887,7 +5822,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5887,7 +5822,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1; int32_t numOfRows = 1;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str);
...@@ -5896,7 +5830,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5896,7 +5830,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pOperator->resultInfo.capacity = 4096;
pOperator->resultInfo.threshold = 4096 * 0.75;
int32_t numOfGroup = 10; // todo replaced with true value
pInfo->groupId = INT32_MIN;
initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup);
pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr; pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) { if (pInfo->pScalarExprInfo != NULL) {
...@@ -5910,11 +5850,11 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5910,11 +5850,11 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = doOpenAggregateOptr; pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult; pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow; pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow; pOperator->decodeResultRow = aggDecodeResultRow;
...@@ -6005,46 +5945,6 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6005,46 +5945,6 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
} }
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
}
size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate";
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
return NULL;
}
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
SArray* pList = taosArrayInit(4, sizeof(int32_t)); SArray* pList = taosArrayInit(4, sizeof(int32_t));
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
...@@ -6824,16 +6724,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6824,16 +6724,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { }
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
taosMemoryFree(ops); taosMemoryFree(ops);
return pOptr; return pOptr;
......
...@@ -65,6 +65,21 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { ...@@ -65,6 +65,21 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#endif #endif
} }
// relocated the column data according to the slotId
static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) {
int32_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
if (!pmInfo->output) {
continue;
}
ASSERT(pmInfo->colId == p->info.colId);
taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p);
}
}
int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
...@@ -110,50 +125,36 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, ...@@ -110,50 +125,36 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
} }
pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
} }
} else {
// failed to load the block sma data, data block statistics does not exist, load data block instead
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
}
return TSDB_CODE_SUCCESS;
}
if (*status == FUNC_DATA_REQUIRED_DATA_LOAD) {
// todo filter data block according to the block sma data firstly
#if 0
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
} }
#endif }
pCost->totalCheckedRows += pBlock->info.rows; ASSERT (*status == FUNC_DATA_REQUIRED_DATA_LOAD);
pCost->loadBlocks += 1;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); // todo filter data block according to the block sma data firstly
if (pCols == NULL) { #if 0
return terrno; if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
} pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
#endif
// relocated the column data into the correct slotId pCost->totalCheckedRows += pBlock->info.rows;
int32_t numOfCols = pBlock->info.numOfCols; pCost->loadBlocks += 1;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
ASSERT(pColMatchInfo->colId == p->info.colId); SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); if (pCols == NULL) {
// taosArraySet(pBlock->pBlockAgg) return terrno;
}
} }
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
doFilter(pTableScanInfo->pFilterNode, pBlock); doFilter(pTableScanInfo->pFilterNode, pBlock);
if (pBlock->info.rows == 0) { if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
......
...@@ -386,19 +386,24 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { ...@@ -386,19 +386,24 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
GET_TYPED_DATA(prev, uint64_t, type, buf); GET_TYPED_DATA(prev, uint64_t, type, buf);
uint64_t val = GET_UINT64_VAL(tval); uint64_t val = GET_UINT64_VAL(tval);
UPDATE_DATA(pCtx, prev, val, numOfElems, isMinFunc, key); if ((prev < val) ^ isMinFunc) {
} else if (type == TSDB_DATA_TYPE_DOUBLE) { *(uint64_t*) buf = val;
double prev = 0; for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
GET_TYPED_DATA(prev, double, type, buf); SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
__ctx->tag.i = key;
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
}
__ctx->fpSet.process(__ctx);
}
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, prev, val, numOfElems, isMinFunc, key); UPDATE_DATA(pCtx, *(double*) buf, val, numOfElems, isMinFunc, key);
} else if (type == TSDB_DATA_TYPE_FLOAT) { } else if (type == TSDB_DATA_TYPE_FLOAT) {
float prev = 0;
GET_TYPED_DATA(prev, float, type, buf);
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, prev, (float)val, numOfElems, isMinFunc, key); UPDATE_DATA(pCtx, *(float*) buf, val, numOfElems, isMinFunc, key);
} }
return numOfElems; return numOfElems;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册