diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d0cb9af7104a0b36a0f99efd55028f48062fc6ac..fd91742789a1beca30992023921b2bd30e8b9a08 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -193,7 +193,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) - +#define CONN_RELEASE_BY_SERVER(conn) ((conn)->status == ConnRelease && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) @@ -238,26 +238,26 @@ void cliHandleResp(SCliConn* conn) { if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); - pCtx = pMsg ? pMsg->ctx: NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - } - tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); - } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); - } + pCtx = pMsg ? pMsg->ctx : NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } + tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); + } else { + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); + } } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); if (pMsg == NULL) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType); - if (transMsg.ahandle == NULL) { - tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); + if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle = NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); } } else { pCtx = pMsg ? pMsg->ctx : NULL; @@ -284,6 +284,11 @@ void cliHandleResp(SCliConn* conn) { // transUnrefCliHandle(conn); return; } + if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { + tTrace("except, server continue send while cli ignore it"); + // transUnrefCliHandle(conn); + return; + } if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); @@ -320,7 +325,7 @@ void cliHandleExcept(SCliConn* pConn) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - while(!transQueueEmpty(&pConn->cliMsgs)){ + while (!transQueueEmpty(&pConn->cliMsgs)) { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;