提交 7778f9a7 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 e30eb2e4
...@@ -657,7 +657,7 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -657,7 +657,7 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
......
...@@ -211,6 +211,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); ...@@ -211,6 +211,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
...@@ -5224,9 +5225,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { ...@@ -5224,9 +5225,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5235,11 +5235,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -5235,11 +5235,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
size_t numOfSources = LIST_LENGTH(pSources); size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
if (pInfo->pSources == NULL) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
tfree(pInfo); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pOperator); goto _error;
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
for(int32_t i = 0; i < numOfSources; ++i) { for(int32_t i = 0; i < numOfSources; ++i) {
...@@ -5247,16 +5245,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -5247,16 +5245,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
taosArrayPush(pInfo->pSources, pNode); taosArrayPush(pInfo->pSources, pNode);
} }
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pInfo);
tfree(pOperator);
taosArrayDestroy(pInfo->pSources);
taosArrayDestroy(pInfo->pSourceDataInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
int32_t code = initDataSource(numOfSources, pInfo); int32_t code = initDataSource(numOfSources, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -5307,12 +5295,12 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -5307,12 +5295,12 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0); destroyExchangeOperatorInfo(pInfo, numOfSources);
} }
tfree(pInfo); tfree(pInfo);
tfree(pOperator); tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -5351,7 +5339,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -5351,7 +5339,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
tfree(pInfo); tfree(pInfo);
tfree(pOperator); tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -5706,7 +5694,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { ...@@ -5706,7 +5694,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
} }
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput);
static void clearupAggSup(SAggSupporter* pAggSup); static void cleanupAggSup(SAggSupporter* pAggSup);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param;
...@@ -5718,7 +5706,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5718,7 +5706,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
} }
blockDataDestroy(pInfo->binfo.pRes); blockDataDestroy(pInfo->binfo.pRes);
clearupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6170,7 +6158,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -6170,7 +6158,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo); tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -7132,7 +7120,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n ...@@ -7132,7 +7120,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void clearupAggSup(SAggSupporter* pAggSup) { static void cleanupAggSup(SAggSupporter* pAggSup) {
tfree(pAggSup->keyBuf); tfree(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowHashTable);
taosHashCleanup(pAggSup->pResultRowListSet); taosHashCleanup(pAggSup->pResultRowListSet);
...@@ -7147,6 +7135,9 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ ...@@ -7147,6 +7135,9 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pInfo->pTableQueryInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) {
...@@ -7170,14 +7161,20 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ ...@@ -7170,14 +7161,20 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t numOfRows = 1;
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1;
int32_t code = initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7190,9 +7187,19 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7190,9 +7187,19 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->_openFn = doOpenAggregateOptr; pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult; pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator; return pOperator;
_error:
destroyAggOperatorInfo(pInfo, numOfCols);
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
...@@ -7205,33 +7212,42 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { ...@@ -7205,33 +7212,42 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput); doDestroyBasicInfo(pInfo, numOfOutput);
} }
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData); tfree(pInfo->prevData);
} }
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
destroyDiskbasedBuf(pInfo->pResultBuf);
}
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*) param; SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p); tfree(pInfo->p);
} }
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData); tfree(pInfo->prevData);
...@@ -7316,14 +7332,20 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -7316,14 +7332,20 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pCtx == NULL) {
goto _error;
}
// initResultRowInfo(&pBInfo->resultRowInfo, 8); // initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN); // setFunctionResultOutput(pBInfo, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7338,8 +7360,15 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -7338,8 +7360,15 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_OUT_OF_MEMORY) {
goto _error;
}
return pOperator; return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
...@@ -7375,43 +7404,51 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 ...@@ -7375,43 +7404,51 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0; return 0;
} }
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
ASSERT(numOfDownstream == 1);
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = *pLimit;
pInfo->currentOffset = pLimit->offset;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->limit = *pLimit;
pInfo->currentOffset = pLimit->offset;
pOperator->name = "LimitOperator"; pOperator->name = "LimitOperator";
// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT; // pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doLimit; pOperator->getNextFn = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error:
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO; pInfo->precision = TSDB_TIME_PRECISION_MICRO;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow; // pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7421,11 +7458,19 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7421,11 +7458,19 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->getNextFn = doIntervalAgg; pOperator->_openFn = operatorDummyOpenFn;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->getNextFn = doIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = code;
return NULL;
} }
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
...@@ -7478,15 +7523,21 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper ...@@ -7478,15 +7523,21 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = calloc(1, sizeof(SSessionAggOperatorInfo)); SSessionAggOperatorInfo* pInfo = calloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
pInfo->binfo.pRes = pResBlock; if (code != TSDB_CODE_SUCCESS) {
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); goto _error;
}
pInfo->prevTs = INT64_MIN; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->reptScan = false;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow; // pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7499,8 +7550,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -7499,8 +7550,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->closeFn = destroySWindowOperatorInfo; pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error:
if (pInfo != NULL) {
destroySWindowOperatorInfo(pInfo, numOfCols);
}
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = code;
return NULL;
} }
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册