未验证 提交 7dea2e99 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17973 from taosdata/fix/TD-20273

fix: free unfinished ahandle when cli quit
......@@ -204,6 +204,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
void destroyAhandle(void* ahandle);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);
......
......@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
......@@ -97,6 +98,9 @@ typedef struct SRpcInit {
// set up timeout for particular msg
RpcTfp tfp;
// destroy client ahandle;
RpcDfp dfp;
void *parent;
} SRpcInit;
......
......@@ -140,12 +140,13 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
// rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......
......@@ -146,6 +146,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
taosMemoryFreeClear(pMsgBody);
}
void destroyAhandle(void *ahandle) {
SMsgSendInfo *pSendInfo = ahandle;
if (pSendInfo == NULL) return;
destroySendMsgInfo(pSendInfo);
}
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) {
......
......@@ -425,6 +425,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
_return:
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
tstrerror(rspCode));
......@@ -438,6 +439,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
code);
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return TSDB_CODE_SUCCESS;
}
......@@ -492,6 +494,7 @@ _return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
SCH_RET(code);
}
......
......@@ -439,6 +439,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
taosMemoryFree(pData->pData);
taosMemoryFree(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
SCH_RET(code);
......@@ -446,6 +448,8 @@ _return:
taosMemoryFree(pData->pData);
taosMemoryFree(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
......@@ -942,7 +946,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &qwMsg, explainRes));
pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
......@@ -1115,7 +1119,7 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
}
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &pRsp, explainRes));
pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
......
......@@ -53,6 +53,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle);
int index;
void* parent;
......
......@@ -53,6 +53,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
......
......@@ -143,6 +143,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static void cliHandleResp(SCliConn* conn);
// handle except about conn
static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
......@@ -163,17 +164,6 @@ static void destroyThrdObj(SCliThrd* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
}
}
destroyCmsg(msg);
}
}
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
......@@ -266,6 +256,23 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
static void* cliWorkThread(void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
} else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) {
tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
pTransInst->destroyFp(msg->ctx->ahandle);
}
}
destroyCmsg(msg);
}
}
bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册