diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3b8ea5858fb18d80038977915f05c3999c396dfd..9af26f9d6712b0de9d0b42355ef7516cb7bf61e1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -154,20 +154,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ - do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ - for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ - break; \ - } \ - } \ - if (i == sz) { \ - pMsg = NULL; \ - } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ - } \ +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ + if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 787c538f2a3f8d3689ed4de7c1165f60cba7009e..6b0413c40e8539ead7b210160a2595df40e3519b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -644,6 +644,7 @@ static void uvDestroyConn(uv_handle_t* handle) { // free(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { + tTrace("work thread quit"); uv_loop_close(thrd->loop); uv_stop(thrd->loop); } @@ -705,12 +706,12 @@ End: return NULL; } void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { + thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_loop_close(thrd->loop); uv_stop(thrd->loop); } else { destroyAllConn(thrd); - thrd->quit = true; } free(msg); } @@ -773,15 +774,16 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { void transCloseServer(void* arg) { // impl later SServerObj* srv = arg; - for (int i = 0; i < srv->numOfThreads; i++) { - sendQuitToWorkThrd(srv->pThreadObj[i]); - destroyWorkThrd(srv->pThreadObj[i]); - } tDebug("send quit msg to accept thread"); uv_async_send(srv->pAcceptAsync); taosThreadJoin(srv->thread, NULL); + for (int i = 0; i < srv->numOfThreads; i++) { + sendQuitToWorkThrd(srv->pThreadObj[i]); + destroyWorkThrd(srv->pThreadObj[i]); + } + free(srv->pThreadObj); free(srv->pAcceptAsync); free(srv->loop);