提交 aa143bed 编写于 作者: H Haojun Liao

fix(query): fix invalid free.

上级 475a399c
...@@ -266,7 +266,8 @@ extern SAppInfo appInfo; ...@@ -266,7 +266,8 @@ extern SAppInfo appInfo;
extern int32_t clientReqRefPool; extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool; extern int32_t clientConnRefPool;
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
......
...@@ -278,7 +278,6 @@ void taos_init_imp(void) { ...@@ -278,7 +278,6 @@ void taos_init_imp(void) {
return; return;
} }
initMsgHandleFp();
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
rpcInit(); rpcInit();
......
...@@ -535,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { ...@@ -535,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
} }
int32_t handleExecRes(SRequestObj* pRequest) { int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) { if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = 0;
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
if (code) { if (code) {
return code; return code;
} }
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) { switch (pRes->msgType) {
...@@ -565,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) { ...@@ -565,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) {
break; break;
} }
default: default:
tscError("invalid exec result for request type %d", pRequest->type); tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self,
return TSDB_CODE_APP_ERROR; pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR;
} }
return code; return code;
...@@ -585,6 +587,12 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { ...@@ -585,6 +587,12 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
return; return;
} }
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList); removeMeta(pTscObj, pRequest->tableList);
} }
...@@ -623,7 +631,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue ...@@ -623,7 +631,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
} }
handleExecRes(pRequest); handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
...@@ -884,7 +892,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -884,7 +892,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
......
...@@ -748,7 +748,6 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { ...@@ -748,7 +748,6 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
static void fetchCallback(void* pResult, void* param, int32_t code) { static void fetchCallback(void* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj* pRequest = (SRequestObj*) param;
......
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
#include "tdef.h" #include "tdef.h"
#include "tname.h" #include "tname.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) { static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code; pRequest->code = code;
terrno = code; terrno = code;
...@@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { ...@@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo; return pMsgSendInfo;
} }
...@@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL); assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
...@@ -285,13 +280,21 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -285,13 +280,21 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
// todo refactor: this arraylist is too large switch (msgType) {
void initMsgHandleFp() { case TDMT_MND_CONNECT:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; return processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; case TDMT_MND_CREATE_DB:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; return processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; case TDMT_MND_USE_DB:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; return processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp; case TDMT_MND_CREATE_STB:
return processCreateSTableRsp;
case TDMT_MND_DROP_DB:
return processDropDbRsp;
case TDMT_MND_ALTER_STB:
return processAlterStbRsp;
default:
return genericRspCallback;
}
} }
...@@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, ...@@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
} }
} }
taosArrayDestroy(dbCfg.pRetensions);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册