提交 3438ed13 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 9adc1ed8
......@@ -197,7 +197,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
......@@ -643,7 +643,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowCellInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
......@@ -693,6 +693,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
......@@ -713,7 +714,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
......
......@@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
......
......@@ -942,7 +942,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
assert(pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -2032,20 +2032,9 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
}
}
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) {
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) {
STableQueryInfo* pTableQueryInfo = buf;
pTableQueryInfo->lastKey = win.skey;
// set more initial size of interval/groupby query
// if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) {
int32_t initialSize = 128;
// int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
// if (code != TSDB_CODE_SUCCESS) {
// return NULL;
// }
// } else { // in other aggregate query, do not initialize the windowResInfo
// }
return pTableQueryInfo;
}
......@@ -2058,8 +2047,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
// cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowCellInfoOffset) {
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
......@@ -2160,7 +2148,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
}
}
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
}
void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) {
......@@ -3202,7 +3190,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -3221,8 +3209,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
*newgroup = false;
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
......@@ -3397,8 +3383,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
bool newgroup = false;
return pOperator->fpSet.getNextFn(pOperator, &newgroup);
return pOperator->fpSet.getNextFn(pOperator);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
......@@ -3569,7 +3554,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
}
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -3716,7 +3701,7 @@ _error:
return NULL;
}
static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doSort(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -3819,7 +3804,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
bool newgroup = true;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -3869,7 +3854,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
......@@ -4086,7 +4071,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
}
}
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
......@@ -4124,21 +4109,18 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
bool prevVal = *newgroup;
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prevVal;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
}
// Return result of the previous group in the firstly.
if (*newgroup) {
if (false) {
if (pRes->info.rows > 0) {
pProjectInfo->existDataBlock = pBlock;
break;
......@@ -4208,7 +4190,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
}
}
static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -4220,6 +4202,9 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
// todo handle different group data interpolation
bool n = false;
bool *newgroup = &n;
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
return pResBlock;
......@@ -4228,7 +4213,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup);
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
......@@ -4394,7 +4379,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
}
STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pTableQueryInfo, false, win);
createTableQueryInfo(pTableQueryInfo, win);
return pTableQueryInfo;
}
......@@ -5529,7 +5514,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SSDataBlock* pRes = pJoinInfo->pRes;
......@@ -5539,12 +5524,10 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
int32_t nrows = 0;
while (1) {
bool prevVal = *newgroup;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup);
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->leftPos = 0;
......@@ -5557,7 +5540,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1];
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup);
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->rightPos = 0;
......
......@@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -541,7 +541,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -558,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......
......@@ -257,11 +257,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
*newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
......@@ -289,7 +287,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -298,10 +296,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
*newgroup = false;
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
......@@ -334,7 +330,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
while (pTableScanInfo->current < total) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
......@@ -421,13 +417,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
return pOperator;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableScanInfo* pTableScanInfo = pOperator->info;
*newgroup = false;
STableBlockDistInfo tableBlockDist = {0};
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
......@@ -514,7 +509,7 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
......@@ -859,7 +854,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
return pBlock;
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
......@@ -1191,7 +1186,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
return pOperator;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......
......@@ -131,7 +131,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -763,12 +763,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC;
// STimeWindow win = {0};
bool newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -883,7 +882,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -908,7 +907,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -934,7 +933,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -945,7 +944,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
SSDataBlock* pBlock = pInfo->binfo.pRes;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->fpSet.getStreamResFn(pOperator, newgroup);
return pOperator->fpSet.getStreamResFn(pOperator);
} else {
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
......@@ -985,7 +984,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
......@@ -1002,14 +1001,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
}
// STimeWindow win = {0};
*newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -1239,7 +1237,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -1262,7 +1260,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -1289,7 +1287,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -1309,7 +1307,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......
......@@ -55,7 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock* pBlock;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......@@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册