提交 07a8f98b 编写于 作者: H Haojun Liao

[td-11818] Refactor.

上级 3a23a30a
...@@ -5278,7 +5278,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5278,7 +5278,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "TableScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan; pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false; pOptr->blockingOptr = false;
......
...@@ -456,7 +456,7 @@ typedef struct SAggOperatorInfo { ...@@ -456,7 +456,7 @@ typedef struct SAggOperatorInfo {
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
STableQueryInfo *current; STableQueryInfo *current;
uint32_t groupId; uint32_t groupId;
SGroupResInfo *pGroupResInfo; SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo; STableQueryInfo *pTableQueryInfo;
} SAggOperatorInfo; } SAggOperatorInfo;
......
...@@ -5290,7 +5290,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5290,7 +5290,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = size; pOperator->numOfOutput = size;
pOperator->pRuntimeEnv = NULL;
pOperator->exec = doLoadRemoteData; pOperator->exec = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -5365,7 +5364,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -5365,7 +5364,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pInfo->current = 0; pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pOperator->name = "DataBlocksOptimizedScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
...@@ -6181,9 +6180,9 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { ...@@ -6181,9 +6180,9 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -7061,7 +7060,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n ...@@ -7061,7 +7060,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) { for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j); STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index]; STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index++];
pTQueryInfo->uid = pk->uid; pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey; pTQueryInfo->lastKey = pk->lastKey;
pTQueryInfo->groupIndex = i; pTQueryInfo->groupIndex = i;
...@@ -7093,9 +7092,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE ...@@ -7093,9 +7092,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo);
setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo); setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo);
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = exprArrayDup(pExprInfo);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
...@@ -7103,8 +7099,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE ...@@ -7103,8 +7099,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = p; pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册