未验证 提交 c76e218d 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #15992 from taosdata/fix/rpcDeadlock

fix client deadlock
...@@ -211,27 +211,27 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -211,27 +211,27 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \ uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \
if (cliMsg->type == Release) return; \ if (cliMsg->type == Release) return; \
} \ } \
tDebug("%s conn %p receive release request, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \ tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \
if (T_REF_VAL_GET(conn) > 1) { \ if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \ transUnrefCliHandle(conn); \
} \ } \
destroyCmsg(pMsg); \ destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \ cliReleaseUnfinishedMsg(conn); \
transQueueClear(&conn->cliMsgs); \ transQueueClear(&conn->cliMsgs); \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \ return; \
} \ } \
} while (0) } while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
...@@ -890,8 +890,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -890,8 +890,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
if (refId != 0) { if (refId != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) { if (exh == NULL) {
tError("failed to get conn, refId: %" PRId64 "", refId);
*ignore = true; *ignore = true;
destroyCmsg(pMsg);
return NULL; return NULL;
} else { } else {
conn = exh->handle; conn = exh->handle;
...@@ -937,7 +937,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -937,7 +937,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
bool ignore = false; bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) { if (ignore == true) {
tError("ignore msg"); // persist conn already release by server
STransMsg resp = {0};
resp.code = TSDB_CODE_RPC_BROKEN_LINK;
resp.msgType = pMsg->msg.msgType + 1;
resp.info.ahandle = pMsg && pMsg->ctx ? pMsg->ctx->ahandle : NULL;
resp.info.traceId = pMsg->msg.info.traceId;
pTransInst->cfp(pTransInst->parent, &resp, NULL);
destroyCmsg(pMsg);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册