提交 61b351a1 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 5f376274
......@@ -240,12 +240,12 @@ typedef struct STaskAttr {
SArray* pUdfInfo; // no need to free
} STaskAttr;
typedef int32_t (*__optr_open_fn_t)(void* param);
typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
......
......@@ -227,8 +227,7 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static int32_t operatorDummyOpenFn(SOperatorInfo *pOperator) {
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
......@@ -4681,9 +4680,7 @@ static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) {
}
}
static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static SSDataBlock* doTableScanImpl(SOperatorInfo *pOperator, bool* newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
......@@ -4731,9 +4728,7 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
return NULL;
}
static SSDataBlock* doTableScan(void* param, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doTableScan(SOperatorInfo *pOperator, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
......@@ -4804,8 +4799,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
return p;
}
static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*)param;
static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -4852,9 +4846,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
#endif
}
static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) {
// NOTE: this operator never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
......@@ -5170,8 +5162,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
}
}
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static int32_t prepareLoadRemoteData(SOperatorInfo *pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
......@@ -5190,9 +5181,7 @@ static int32_t prepareLoadRemoteData(void* param) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static SSDataBlock* doLoadRemoteData(SOperatorInfo *pOperator, bool* newgroup) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
......@@ -5418,7 +5407,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doTableScanImpl;
pOperator->getNextFn = doTableScanImpl;
return pOperator;
}
......@@ -5497,9 +5486,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
tsem_post(&pSourceDataInfo->pEx->ready);
}
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables.
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
......@@ -5989,8 +5977,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
}
static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6160,8 +6147,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
return NULL;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6247,46 +6233,63 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
}
// this is a blocking operator
static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
bool newgroup = true;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
}
doSetOperatorCompleted(pOperator);
finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) {
SAggOperatorInfo *pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes);
doSetOperatorCompleted(pOperator);
return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6354,9 +6357,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
return pInfo->pRes;
}
static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
......@@ -6443,8 +6444,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doLimit(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
static SSDataBlock* doLimit(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6526,8 +6526,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
return NULL;
}
static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6582,8 +6581,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
return pInfo->binfo.pRes->info.rows == 0? NULL:pInfo->binfo.pRes;
}
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6642,8 +6640,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->binfo.pRes->info.rows == 0? NULL:pIntervalInfo->binfo.pRes;
}
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6702,8 +6699,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->binfo.pRes;
}
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6836,8 +6832,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
// pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doStateWindowAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6894,8 +6889,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6954,8 +6948,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -7044,9 +7037,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti
}
}
static SSDataBlock* doFill(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
......@@ -7225,7 +7216,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doAggregate;
pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7364,7 +7356,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator";
// pOperator->operatorType = OP_Project;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
......@@ -7704,7 +7696,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
static SSDataBlock* doTagScan(void* param, bool* newgroup) {
static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -7909,8 +7901,7 @@ static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBl
}
}
static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* hashDistinct(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册