diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 0f12880272d0d10aebdc00482bafe4972691b60a..32668356eed5400c5e54be5712ff43de39cb6e6d 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -43,13 +43,23 @@ extern "C" { } \ } while (0) +#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms +// todo refactor enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, }; +enum { + RES_TYPE__QUERY = 1, + RES_TYPE__TMQ, +}; + +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) + typedef struct SAppInstInfo SAppInstInfo; typedef struct { @@ -172,33 +182,15 @@ typedef struct SReqResultInfo { int32_t payloadLen; } SReqResultInfo; -typedef struct SShowReqInfo { - int64_t execId; // showId/queryId - int32_t vgId; - SArray* pArray; // SArray - int32_t currentIndex; // current accessed vgroup index. -} SShowReqInfo; - typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; - SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; -#define ERROR_MSG_BUF_DEFAULT_SIZE 512 - -enum { - RES_TYPE__QUERY = 1, - RES_TYPE__TMQ, -}; - -#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) - typedef struct { int8_t resType; char* topic; @@ -217,7 +209,6 @@ typedef struct SRequestObj { int32_t sqlLen; int64_t self; char* msgBuf; - void* pInfo; // sql parse info, generated by parser module int32_t code; SArray* dbList; SArray* tableList; @@ -252,7 +243,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -int taos_init(); +int taos_init(); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); @@ -266,7 +257,6 @@ char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void resetConnectDB(STscObj* pTscObj); -void taos_init_imp(void); int taos_options_imp(TSDB_OPTION option, const char* str); void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); @@ -286,8 +276,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, - bool convertUcs4); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fec6c8e5db5d53e15f46b506aff9bcf1c1807246..a258fd305e0f2658ba7e7e952eeb2e3ebe138ccd 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -33,7 +33,7 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; +static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { @@ -188,7 +188,6 @@ static void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); - taosMemoryFreeClear(pRequest->pInfo); taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); @@ -198,10 +197,6 @@ static void doDestroyRequest(void *p) { schedulerFreeJob(pRequest->body.queryJob); } - if (pRequest->body.showInfo.pArray != NULL) { - taosArrayDestroy(pRequest->body.showInfo.pArray); - } - taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 96a7230ff3df8ef336d457b8a23231522f2ad216..b3615c208c485be803fe55e30ee6e5445898c4e6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include "clientInt.h" #include "clientLog.h" diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 42d204284e8693538ab64a4135d3d08066d8d37d..96ef8d0d21857d935c6b98425d36acc2e72f4eea 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -193,7 +193,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } } else { - // assert to avoid uninitialization error + // assert to avoid un-initialization error ASSERT(0); } return NULL; @@ -355,6 +355,7 @@ int taos_result_precision(TAOS_RES *res) { if (res == NULL) { return TSDB_TIME_PRECISION_MILLI; } + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; return pRequest->body.resInfo.precision; @@ -467,6 +468,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { if (res == NULL) { return 0; } + if (TD_RES_TMQ(res)) { SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); if (pResultInfo == NULL) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 67c5679cac5156b21ce0b892e6d0d824854c0f6a..87a8e86415d4a63bd6b3e7f0636e8226298684b2 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -117,10 +117,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { struct SCatalog *pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { + int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, - tstrerror(code)); + tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -154,10 +154,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { } else { struct SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { + int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, - tstrerror(code)); + tstrerror(code1)); } else { catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); } @@ -209,84 +209,9 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { } void initMsgHandleFp() { -#if 0 - tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; - tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; - tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; - - tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; - tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; - tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg; - - tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg; - tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg; - - tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg; - tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; - tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; - tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; - tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg; - tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg; - tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; - tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; - tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; - tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; - tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; - tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; - tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg; - tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; - tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg; - - - tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; - tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; - - tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; - tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg; - tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; - tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; - tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; - - tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp; - tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode; - - tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp; - tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp; - - tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; - tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; - tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; - tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; - - tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. - tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; - - tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp; - - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; - - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; - - tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; - tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; - tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp; - - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; -#endif - - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 45674d31bccf019450d318bba25491dcee02d2f5..20b71698952d32a463376319b8633d1163d2c34b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6815,6 +6815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); } + SOperatorInfo* pOptr = NULL; if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); @@ -6822,7 +6823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); @@ -6836,9 +6837,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); } - return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); + pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { - return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -6856,39 +6857,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); + pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); - return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); + pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -6901,7 +6902,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ - return NULL; + + taosMemoryFree(ops); + return pOptr; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 67871dfe6241b0b1c3e48856f6068e96d4d09916..a9a360e03e05049f802853a25131c956bd85524b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1020,7 +1020,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (ctx->phase == QW_PHASE_PRE_QUERY) { - ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle; + ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle; ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); needRsp = false;