diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b021651c16640e8ee48554d59f735be45515ec86..57d4031835817e5615b4561ac7afd0f7b7714dc8 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); +void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 596c3d3fbf7fbc1ad975fea4eb88a02cd0ae7ab5..819c50275b737238f11b74b19c3fb441a5d38aaf 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) { taosMemoryFreeClear(pTscObj); } -void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) { +void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return NULL; } + pObj->connType = connType; pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 97c7d2bad1bcf54fad6c4a49922b5e17c06f7166..53aebe751b0282c416ed547617d9df899ca3154e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -25,7 +25,7 @@ #include "tref.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static bool stringLengthCheck(const char* str, size_t maxsize) { @@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType) { - STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); + STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return pTscObj; @@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return NULL; } - SMsgSendInfo* body = buildConnectMsg(pRequest, connType); + SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); @@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { } taosMemoryFreeClear(db); - connectReq.connType = connType; - connectReq.pid = htonl(appInfo.pid); + connectReq.connType = pObj->connType; + connectReq.pid = htonl(appInfo.pid); connectReq.startTime = htobe64(appInfo.startTime); + tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user)); tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd)); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9d75586917a7774a449798d090656de461873cd6..ae6aee13d51777e141bafc438084a155ae7f148d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - // TODO + if (taos == NULL || sql == NULL) { + // todo directly call fp + } + + taos_query_l(taos, sql, (int32_t) strlen(sql)); } void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 98559f974d4fc9a537d77389d5052dc0425d8f72..93e81aa70ee2d9d7a3658adc86d4bd3aa55315ad 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -62,11 +62,6 @@ enum { * 2. when all data within queried time window, it is also denoted as query_completed */ TASK_COMPLETED = 0x2u, - - /* when the result is not completed return to client, this status will be - * usually used in case of interval query with interpolation option - */ - TASK_OVER = 0x4u, }; typedef struct SResultRowCell { @@ -288,12 +283,6 @@ typedef struct SOperatorInfo { SOperatorFpSet fpSet; } SOperatorInfo; -typedef struct { - int32_t numOfTags; - int32_t numOfCols; - SColumnInfo* colList; -} SQueriedTableInfo; - typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, - SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, @@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput); #endif int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); -void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ba779509122f5bc7d9feb738c485f8eebddca01c..3cc75a815d0a31f4c0ebbb6a83650271de13b785 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER); + return isTaskKilled(pTaskInfo); } void qDestroyTask(qTaskInfo_t qTaskHandle) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 005d39b525b709213f374721cb2e87d68b76d9f2..a2aaac1f50ee0334932c5454e2113dfcaed1d25a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); -static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); @@ -922,6 +921,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { if (IS_VAR_DATA_TYPE(type)) { // todo disable this + // if (pResultRow->key == NULL) { // pResultRow->key = taosMemoryMalloc(varDataTLen(pData)); // varDataCopy(pResultRow->key, pData); @@ -1451,8 +1451,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in } } -#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) - // static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, // SqlFunctionCtx *pCtx, int32_t numOfRows) { // STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -2009,8 +2007,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } // todo merged with the build group result. -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, - SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { +void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, + int32_t* rowCellInfoOffset) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) { SResultRowPosition* pPos = &pResultRowInfo->pPosition[i]; @@ -2023,17 +2021,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD // } for (int32_t j = 0; j < numOfOutput; ++j) { - pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); - - struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; + struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset); if (!isRowEntryInitialized(pResInfo)) { continue; } - if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr. - // pCtx[j].fpSet.finalize(&pCtx[j]); - } - if (pRow->numOfRows < pResInfo->numOfRes) { pRow->numOfRows = pResInfo->numOfRes; } @@ -3682,10 +3674,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOptrBasicInfo* pInfo = &pAggInfo->binfo; - int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; - bool newgroup = true; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3698,6 +3688,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } + int32_t order = getTableScanOrder(pOperator); + // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, @@ -3730,8 +3722,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, - &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, + pAggInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); OPTR_SET_OPENED(pOperator); @@ -4014,7 +4006,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } } - // todo dynamic set tags + // todo set tags + // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); @@ -4028,7 +4021,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, pTaskInfo->code); } @@ -4279,11 +4271,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { STableKeyInfo* pk = taosArrayGet(pa, j); - STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; - // pTQueryInfo->uid = pk->uid; pTQueryInfo->lastKey = pk->lastKey; - // pTQueryInfo->groupIndex = i; } } @@ -4302,7 +4291,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t numOfRows = 1; + int32_t numOfRows = 10; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, numOfRows); @@ -4313,9 +4302,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pOperator->resultInfo.capacity = 4096; - pOperator->resultInfo.threshold = 4096 * 0.75; - int32_t numOfGroup = 10; // todo replaced with true value pInfo->groupId = INT32_MIN; initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup); @@ -4545,42 +4531,6 @@ _error: return NULL; } -static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { - int32_t j = 0; - - if (TSDB_COL_IS_TAG(pExpr->pParam[0].pCol->type)) { - if (pExpr->pParam[0].pCol->colId == TSDB_TBNAME_COLUMN_INDEX) { - return TSDB_TBNAME_COLUMN_INDEX; - } - - while (j < pTableInfo->numOfTags) { - if (pExpr->pParam[0].pCol->colId == pTagCols[j].colId) { - return j; - } - - j += 1; - } - - } /*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data - return TSDB_UD_COLUMN_INDEX; - } else { - while (j < pTableInfo->numOfCols) { - if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) { - return j; - } - - j += 1; - } - }*/ - - return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value -} - -bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { - int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols); - return j != INT32_MIN; -} - static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; @@ -4739,7 +4689,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); -static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode); +static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { SInterval interval = { @@ -5240,28 +5190,6 @@ _complete: return code; } -static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, - int32_t numOfOutput, int32_t tagLen, bool superTable) { - for (int32_t i = 0; i < numOfOutput; ++i) { - int16_t functId = getExprFunctionId(&pExprs[i]); - - if (functId == FUNCTION_TOP || functId == FUNCTION_BOTTOM) { - int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); - if (j < 0 || j >= pTableInfo->numOfCols) { - return TSDB_CODE_QRY_INVALID_MSG; - } else { - SColumnInfo* pCol = &pTableInfo->colList[j]; - // int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i, - // &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes, - // &pExprs[i].base.interBytes, tagLen, superTable, NULL); - // assert(ret == TSDB_CODE_SUCCESS); - } - } - } - - return TSDB_CODE_SUCCESS; -} - void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2ba3e257b2f4af5fa141fb88196f8b37def6db2b..e3a507bf7cdc7e035039042e6878e004d525b2d5 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, - &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, + pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ebde1c79970c70ba37c2b80f3153060d6493c6a3..738f4821bde2dd153a4b38457e31bab5977510ba 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, - &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, + pInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); OPTR_SET_OPENED(pOperator); @@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);