diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5e3860822ec161ef52eece2931b7c71dee4fb40d..101092f13e004286eef466d74598444ef2689157 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -86,7 +86,7 @@ typedef struct SRpcInit { int32_t rpcInit(); void rpcCleanup(); -void *rpcOpen(const SRpcInit *pRpc); +void * rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); @@ -99,6 +99,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +void rpcRefHandle(void *handle, int8_t type); +void rpcUnrefHandle(void *handle, int8_t type); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index cd6c78cbdcd7fc293e4fdac3d998a1499558d09c..f2ac77fe61e41cbc8d041cfb82d80d3a39f0e505 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -242,8 +242,11 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); int transSetConnOption(uv_tcp_t* stream); -// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); -// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); +void transRefSrvHandle(void* handle); +void transUnrefSrvHandle(void* handle); + +void transRefCliHandle(void* handle); +void transUnrefCliHandle(void* handle); #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b45683617f6343f5df79fb85e95e44aac3a8c056..f3e04173972a63a7662078901d1aa8335c158f5e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -122,4 +122,17 @@ void rpcCleanup(void) { // return; } + +void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; + +void rpcRefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosRefHandle[type])(handle); +} +void rpcUnrefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosUnRefHandle[type])(handle); +} + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a417a57436c9b23678aa477028c4dd748c2d9c87..0d3946d967bb745bd82794590ffe0fb238e95eec 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -351,14 +351,11 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - // transDestroyBuffer(&conn->readBuf); free(conn->stream); free(conn->writeReq); - tTrace("client conn %p destroy successfully", conn); + tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); - - // clientConnDestroy(conn, false); } static void clientWriteCb(uv_write_t* req, int status) { @@ -454,8 +451,7 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("client msg tran time cost: %" PRIu64 "us", el); - et = taosGetTimestampUs(); + tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pTransInst = pThrd->pTransInst; @@ -630,8 +626,6 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { static void clientSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); - msg->ctx = NULL; // - transSendAsync(thrd->asyncPool, &msg->q); } void taosCloseClient(void* arg) { @@ -650,6 +644,23 @@ static int clientRBChoseIdx(SRpcInfo* pTransInst) { } return index % pTransInst->numOfThreads; } +void transRefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_INC((SCliConn*)handle); + UNUSED(ref); +} +void transUnrefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SCliConn*)handle); + if (ref == 0) { + } + + // unref cli handle +} void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index cd02d3bd77d45c2fdbc483f3e0b67c49eebbce51..432ec472fbe55786a3d6e25f37c8cc76657b2b34 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -33,11 +33,11 @@ typedef struct SSrvConn { void* ahandle; // void* hostThrd; SArray* srvMsgs; - // void* pSrvMsg; + + bool broken; // conn broken; struct sockaddr_in addr; struct sockaddr_in locaddr; - // SRpcMsg sendMsg; // del later char secured; @@ -206,7 +206,6 @@ static void uvHandleReq(SSrvConn* pConn) { } pConn->inType = pHead->msgType; - // assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; pHead->code = htonl(pHead->code); @@ -230,7 +229,8 @@ static void uvHandleReq(SSrvConn* pConn) { rpcMsg.handle = pConn; transClearBuffer(&pConn->readBuf); - pConn->ref++; + + transRefSrvHandle(pConn); tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); @@ -255,23 +255,20 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (nread == UV_EOF) { - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - destroyConn(conn, true); - return; - } if (nread == 0) { return; } - if (nread < 0 || nread != UV_EOF) { - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - destroyConn(conn, true); + + tError("server conn %p read error: %s", conn, uv_err_name(nread)); + if (nread < 0 || nread == UV_EOF) { + conn->broken = true; + transUnrefSrvHandle(conn); + + // if (conn->ref > 1) { + // conn->ref++; // ref > 1 signed that write is in progress + //} + // tError("server conn %p read error: %s", conn, uv_err_name(nread)); + // destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -304,10 +301,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - // - destroyConn(conn, true); + conn->broken = false; + transUnrefSrvHandle(conn); } - // opt } static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { @@ -353,15 +349,18 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; uv_timer_stop(pConn->pTimer); - - // pConn->pSrvMsg = smsg; - // conn->pWriter->data = smsg; uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); } static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; - pConn->ref--; // + + if (pConn->broken == true) { + transUnrefSrvHandle(pConn); + return; + } + transUnrefSrvHandle(pConn); + if (taosArrayGetSize(pConn->srvMsgs) > 0) { tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); @@ -386,7 +385,8 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); - destroyConn(c, true); + transUnrefSrvHandle(c); + // destroyConn(c, true); } } void uvWorkerAsyncCb(uv_async_t* handle) { @@ -394,11 +394,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = item->pThrd; SSrvConn* conn = NULL; queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); - // pthread_mutex_unlock(&mtx); while (!QUEUE_IS_EMPTY(&wq)) { queue* head = QUEUE_HEAD(&wq); @@ -411,7 +411,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } if (msg->pConn == NULL) { free(msg); - destroyAllConn(pThrd); uv_loop_close(pThrd->loop); @@ -601,7 +600,9 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); - ++pConn->ref; + + pConn->broken = false; + transRefSrvHandle(pConn); return pConn; } @@ -609,10 +610,6 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref); - if (--conn->ref > 0) { - return; - } transDestroyBuffer(&conn->readBuf); for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { @@ -624,9 +621,9 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (clear) { tTrace("try to destroy conn %p", conn); - uv_tcp_close_reset(conn->pTcp, uvDestroyConn); - // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); - // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); + // uv_tcp_close_reset(conn->pTcp, uvDestroyConn); + uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); + uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); // uv_unref((uv_handle_t*)conn->pTcp); // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } @@ -722,8 +719,6 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); free(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); - - // free(pThrd->workerAsync); free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { @@ -757,6 +752,27 @@ void taosCloseServer(void* arg) { free(srv); } +void transRefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + SSrvConn* conn = handle; + + int ref = T_REF_INC((SSrvConn*)handle); + UNUSED(ref); +} + +void transUnrefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SSrvConn*)handle); + + if (ref == 0) { + destroyConn((SSrvConn*)handle, true); + } + // unref srv handle +} void rpcSendResponse(const SRpcMsg* pMsg) { if (pMsg->handle == NULL) { return;