diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9eea43be2354e5407a31a7bd10d9f215d8dd0cb5..d78e79f96fc1e4faf7071ecd3089639426972f76 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); -static int sockDebugInfo(struct sockaddr* sockname, char* dst) { - struct sockaddr_in addr = *(struct sockaddr_in*)sockname; - - char buf[16] = {0}; - int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); - sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); - return r; -} // register timer for read static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn @@ -121,12 +113,14 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after write data to socket static void cliSendCb(uv_write_t* req, int status); -// callback after conn to server +// callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); +static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); + static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; + if (cliRecvReleaseReq(conn, pHead)) { + return; + } CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { @@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle); } } else { - pCtx = pMsg ? pMsg->ctx : NULL; + pCtx = pMsg->ctx; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); } @@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) { } STraceId* trace = &transMsg.info.traceId; - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code)); @@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) { if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } +bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { + if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + uint64_t ahandle = pHead->ahandle; + SCliMsg* pMsg = NULL; + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + transClearBuffer(&conn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { + SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); + if (cliMsg->type == Release) return true; + } + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); + if (T_REF_VAL_GET(conn) > 1) { + transUnrefCliHandle(conn); + } + destroyCmsg(pMsg); + cliReleaseUnfinishedMsg(conn); + transQueueClear(&conn->cliMsgs); + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); + return true; + } + return false; +} + static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4d35e346b17d81e108d33e5c2f1e6da376047f5e..1b51df202f673300ad17d7b607133baefa6b676a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvPrepareCb(uv_prepare_t* handle); +static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead); + /* * time-consuming task throwed into BG work thread */ @@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status); static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvFreeCb(uv_handle_t* handle); -static void uvStartSendRespInternal(SSvrMsg* smsg); +static void uvStartSendRespImpl(SSvrMsg* smsg); static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSvrMsg* msg); @@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - reallocConnRef(conn); \ - tTrace("conn %p received release request", conn); \ - \ - STraceId traceId = head->traceId; \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - \ - STransMsg tmsg = { \ - .code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - #define SRV_RELEASE_UV(loop) \ do { \ uv_walk(loop, uvWalkCb, NULL); \ @@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) { // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - CONN_SHOULD_RELEASE(pConn, pHead); + if (uvRecvReleaseReq(pConn, pHead)) { + return; + } STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); @@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) { msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg != NULL) { - uvStartSendRespInternal(msg); + uvStartSendRespImpl(msg); } } else { - uvStartSendRespInternal(msg); + uvStartSendRespImpl(msg); } } } @@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { wb->len = len; } -static void uvStartSendRespInternal(SSvrMsg* smsg) { +static void uvStartSendRespImpl(SSvrMsg* smsg) { SSvrConn* pConn = smsg->pConn; if (pConn->broken) { return; @@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) { if (!transQueuePush(&pConn->srvMsgs, smsg)) { return; } - uvStartSendRespInternal(smsg); + uvStartSendRespImpl(smsg); return; } @@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, uvDestroyConn); taosMemoryFree(req); } +static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { + if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + reallocConnRef(pConn); + tTrace("conn %p received release request", pConn); + + STraceId traceId = pHead->traceId; + pConn->status = ConnRelease; + transClearBuffer(&pConn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + + STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + if (!transQueuePush(&pConn->srvMsgs, srvMsg)) { + return true; + } + if (pConn->regArg.init) { + tTrace("conn %p release, notify server app", pConn); + STrans* pTransInst = pConn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); + } + uvStartSendRespImpl(srvMsg); + return true; + } + return false; +} static void uvPrepareCb(uv_prepare_t* handle) { // prepare callback SWorkThrd* pThrd = handle->data; @@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; } - uvStartSendRespInternal(msg); + uvStartSendRespImpl(msg); return; } else if (conn->status == ConnRelease || conn->status == ConnNormal) { tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);