提交 7b30a6ee 编写于 作者: dengyihao's avatar dengyihao

free unfinished ahandle while quit

上级 495bfe93
......@@ -199,6 +199,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
void destroyAhandle(void* ahandle);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);
......
......@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
......@@ -97,6 +98,9 @@ typedef struct SRpcInit {
// set up timeout for particular msg
RpcTfp tfp;
// destroy client ahandle;
RpcDfp dfp;
void *parent;
} SRpcInit;
......
......@@ -140,12 +140,13 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
// rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......
......@@ -146,6 +146,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
taosMemoryFreeClear(pMsgBody);
}
void destroyAhandle(void *ahandle) {
SMsgSendInfo *pSendInfo = ahandle;
if (pSendInfo == NULL) return;
destroySendMsgInfo(pSendInfo);
}
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) {
......
......@@ -53,6 +53,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle);
int index;
void* parent;
......
......@@ -53,6 +53,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
......
......@@ -143,6 +143,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static void cliHandleResp(SCliConn* conn);
// handle except about conn
static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
......@@ -163,17 +164,6 @@ static void destroyThrdObj(SCliThrd* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
}
}
destroyCmsg(msg);
}
}
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
......@@ -266,6 +256,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
static void* cliWorkThread(void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
continue;
}
if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) {
tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
pTransInst->destroyFp(msg->ctx->ahandle);
}
}
destroyCmsg(msg);
}
}
bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册