提交 09fc0b79 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 723a6bd9
......@@ -254,6 +254,17 @@ enum {
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_get_explain_fn_t getExplainFn;
} SOperatorFpSet;
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
......@@ -267,15 +278,7 @@ typedef struct SOperatorInfo {
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
// todo extract struct
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_get_explain_fn_t getExplainFn;
SOperatorFpSet fpSet;
} SOperatorInfo;
typedef struct {
......@@ -609,6 +612,10 @@ typedef struct SJoinOperatorInfo {
SNode *pOnCondition;
} SJoinOperatorInfo;
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
......@@ -653,6 +660,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......
......@@ -159,7 +159,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
......
......@@ -226,6 +226,23 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain) {
SOperatorFpSet fpSet = {
._openFn = openFn,
.getNextFn = nextFn,
.getStreamResFn = streamFn,
.cleanupFn = cleanup,
.closeFn = closeFn,
.encodeResultRow = encode,
.decodeResultRow = decode,
.getExplainFn = explain,
};
return fpSet;
}
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
......@@ -4081,7 +4098,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -4176,9 +4193,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->info = pInfo;
pOperator->numOfOutput = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo,
NULL, NULL, NULL);
#if 1
{ // todo refactor
......@@ -4289,7 +4306,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
bool newgroup = false;
return pOperator->getNextFn(pOperator, &newgroup);
return pOperator->fpSet.getNextFn(pOperator, &newgroup);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
......@@ -4586,9 +4603,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSortedMerge;
pOperator->closeFn = destroySortedMergeOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
NULL, NULL, NULL);
code = appendDownstream(pOperator, downstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -4667,8 +4684,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo,
NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -4710,7 +4727,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
bool newgroup = true;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -4765,7 +4782,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -5007,7 +5024,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -5070,7 +5087,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -5121,9 +5138,9 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
SSDataBlock* pBlock = pInfo->binfo.pRes;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup);
return pOperator->fpSet.getStreamResFn(pOperator, newgroup);
} else {
pTaskInfo->code = pOperator->_openFn(pOperator);
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -5165,7 +5182,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -5216,7 +5233,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -5265,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -5400,7 +5417,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -5451,7 +5468,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -5531,7 +5548,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup);
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
......@@ -5603,8 +5620,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
return;
}
if (pOperator->closeFn != NULL) {
pOperator->closeFn(pOperator->info, pOperator->numOfOutput);
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput);
}
if (pOperator->pDownstream != NULL) {
......@@ -5739,12 +5756,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5871,9 +5885,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = num;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doProjectOperation;
pOperator->closeFn = destroyProjectOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo,
NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -5929,12 +5943,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doBuildIntervalResult;
pOperator->getStreamResFn = doStreamIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5951,6 +5962,65 @@ _error:
return NULL;
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "StreamTimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
......@@ -5969,8 +6039,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doAllIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo,
NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -6010,10 +6081,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->getNextFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -6057,10 +6127,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
......@@ -6147,12 +6216,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doFill;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroySFillOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo,
NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -7017,8 +7084,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
(*pRes)[*resNum].startupCost = operatorInfo->cost.openCost;
(*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost;
if (operatorInfo->getExplainFn) {
int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
if (operatorInfo->fpSet.getExplainFn) {
int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
if (code) {
qError("operator getExplainFn failed, error:%s", tstrerror(code));
return code;
......@@ -7055,7 +7122,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup);
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup);
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->leftPos = 0;
......@@ -7068,7 +7135,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1];
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup);
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup);
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->rightPos = 0;
......@@ -7161,9 +7228,9 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMergeJoin;
pOperator->closeFn = destroyBasicOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo,
NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
return pOperator;
......
......@@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -360,12 +360,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -562,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -618,14 +614,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
pOperator->numOfOutput = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashPartition;
pOperator->closeFn = destroyPartitionOperatorInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......
......@@ -405,7 +405,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan;
pOperator->fpSet.getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
static int32_t cost = 0;
......@@ -429,7 +429,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->getNextFn = doTableScanImpl;
pOperator->fpSet.getNextFn = doTableScanImpl;
return pOperator;
}
......@@ -502,8 +502,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doBlockInfoScan;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doBlockInfoScan;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -532,7 +532,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -659,9 +659,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -981,8 +981,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysScanOperator;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -1139,11 +1139,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr,
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->getNextFn = doTagScan;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyTagScanOperatorInfo;
return pOperator;
_error:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册