提交 956173e2 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 f2a27f58
...@@ -433,10 +433,7 @@ typedef struct SAggOperatorInfo { ...@@ -433,10 +433,7 @@ typedef struct SAggOperatorInfo {
STableQueryInfo *current; STableQueryInfo *current;
uint64_t groupId; uint64_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SExprInfo *pScalarExprInfo; SExprSupp scalarExprSup;
int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied
SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct.
int32_t *rowEntryInfoOffset; // offset value for each row result cell info
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
...@@ -662,15 +659,19 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, ...@@ -662,15 +659,19 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols); void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSup(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList); SArray* pColList);
......
...@@ -90,8 +90,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type ...@@ -90,8 +90,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock); static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
...@@ -2862,9 +2860,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -2862,9 +2860,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
// there is an scalar expression that needs to be calculated before apply the group aggregation. // there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) { if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
pAggInfo->numOfScalarExpr, NULL); code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -3405,13 +3403,10 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -3405,13 +3403,10 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
destroyDiskbasedBuf(pAggSup->pResultBuf); destroyDiskbasedBuf(pAggSup->pResultBuf);
} }
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { size_t keyBufSize, const char* pkey) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pSup->rowEntryInfoOffset); initExprSupp(pSup, pExprInfo, numOfCols);
pBasicInfo->pRes = pResultBlock;
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf; pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
} }
...@@ -3428,6 +3423,19 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { ...@@ -3428,6 +3423,19 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
} }
} }
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
pInfo->pRes = pBlock;
initResultRowInfo(&pInfo->resultRowInfo);
}
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
}
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
...@@ -3441,28 +3449,20 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3441,28 +3449,20 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
int32_t code = int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->groupId = INT32_MIN; initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) {
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowEntryInfoOffset);
}
pOperator->name = "TableAggregate"; pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
...@@ -3501,7 +3501,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -3501,7 +3501,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL; return NULL;
} }
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
assert(pInfo != NULL); assert(pInfo != NULL);
cleanupResultRowInfo(&pInfo->resultRowInfo); cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
...@@ -3509,12 +3509,12 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { ...@@ -3509,12 +3509,12 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
doDestroyBasicInfo(pInfo, numOfOutput); cleanupBasicInfo(pInfo);
} }
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
} }
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -3529,7 +3529,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3529,7 +3529,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
return; return;
} }
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
} }
...@@ -3543,7 +3543,7 @@ void cleanupExecSupp(SExprSupp* pSupp) { ...@@ -3543,7 +3543,7 @@ void cleanupExecSupp(SExprSupp* pSupp) {
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param; SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
...@@ -3609,7 +3609,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -3609,7 +3609,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
...@@ -3618,8 +3619,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -3618,8 +3619,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
...@@ -3719,11 +3718,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3719,11 +3718,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
int32_t numOfExpr = 0; int32_t numOfExpr = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr); SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr);
int32_t numOfScalarExpr = 0;
if (pPhyNode->pExprs != NULL) { if (pPhyNode->pExprs != NULL) {
SExprSupp* pSup = &pInfo->scalarSup; SExprSupp* pSup1 = &pInfo->scalarSup;
pSup->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup->numOfExprs); pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, numOfScalarExpr, &pSup->rowEntryInfoOffset); pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset);
} }
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
...@@ -3739,20 +3737,22 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3739,20 +3737,22 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
} }
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExpr; pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL); destroyIndefinitOperatorInfo, NULL, NULL, NULL);
......
...@@ -33,7 +33,7 @@ static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo ...@@ -33,7 +33,7 @@ static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->keyBuf); taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
...@@ -397,7 +397,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -397,7 +397,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
} }
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
...@@ -665,7 +666,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { ...@@ -665,7 +666,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
......
...@@ -1354,19 +1354,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1354,19 +1354,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData); taosMemoryFreeClear(pInfo->stateKey.pData);
} }
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
} }
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
if (pInfo->pChildren) { if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -1461,9 +1461,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1461,9 +1461,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
int32_t code = int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initAggInfo(&pInfo->binfo, pSup, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock);
if (isStream) { if (isStream) {
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx); increaseTs(pSup->pCtx);
...@@ -1531,8 +1531,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1531,8 +1531,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
int32_t code = int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1966,7 +1966,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -1966,7 +1966,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = *pTwAggSup; pInfo->twAggSup = *pTwAggSup;
...@@ -1995,7 +1997,7 @@ _error: ...@@ -1995,7 +1997,7 @@ _error:
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -2010,12 +2012,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -2010,12 +2012,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
int32_t code = int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
...@@ -2329,8 +2332,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2329,8 +2332,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pOperator->exprSupp.pCtx); increaseTs(pOperator->exprSupp.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
...@@ -2400,7 +2405,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { ...@@ -2400,7 +2405,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup); destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
if (pInfo->pChildren != NULL) { if (pInfo->pChildren != NULL) {
...@@ -2415,12 +2420,14 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2415,12 +2420,14 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
int32_t initBasicInfo(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pSup->rowEntryInfoOffset); initExprSupp(pSup, pExprInfo, numOfCols);
pBasicInfo->pRes = pResultBlock; initBasicInfo(pBasicInfo, pResultBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = NULL; pSup->pCtx[i].pBuf = NULL;
} }
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx); increaseTs(pSup->pCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2459,11 +2466,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2459,11 +2466,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
code = initBasicInfo(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pSup->pCtx, numOfCols); code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pSup->pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -3031,7 +3039,7 @@ _error: ...@@ -3031,7 +3039,7 @@ _error:
void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param; SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup); destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
if (pInfo->pChildren != NULL) { if (pInfo->pChildren != NULL) {
...@@ -3385,7 +3393,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3385,7 +3393,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
code = initBasicInfo(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3604,19 +3612,17 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -3604,19 +3612,17 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC; iaInfo->order = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval; iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel; iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId; iaInfo->primaryTsIndex = primaryTsSlotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
int32_t code = int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initAggInfo(&iaInfo->binfo, &pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
...@@ -3625,8 +3631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -3625,8 +3631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
} }
// iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册