diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c760acd52e80443d2a4bb7b5874ce7258e687798..f0b54a82e75888bd45b237b9a439a66c0732f112 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,9 +123,9 @@ typedef struct { } SRpcReqContext; typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app + SRpcInfo* pTransInst; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app // struct SRpcConn* pConn; // pConn allocated tmsg_t msgType; // message type uint8_t* pCont; // content provided by app diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bfadfe56eff19e4738d53a9f1209bac421d16095..e5b80d84196fdb5b8d9a1b0bcf823c170ac12621 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { char spi; char secured; uint64_t expireTime; + int8_t notifyCount; // timers already notify to client } SCliConn; typedef struct SCliMsg { @@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); -// process data read from server, auth/decompress etc later -static void clientHandleResp(SCliConn* conn); // check whether already read complete packet from server static bool clientReadComplete(SConnBuffer* pBuf); // alloc buf for read @@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void clientMsgDestroy(SCliMsg* pMsg); +// process data read from server, auth/decompress etc later +static void clientHandleResp(SCliConn* conn); +// handle except about conn +static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientMsgDestroy(SCliMsg* pMsg); +static void destroyTransConnCtx(STransConnCtx* ctx); // thread obj static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); @@ -100,22 +104,38 @@ static void* clientThread(void* arg); static void clientHandleResp(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = pCtx->pRpc; + SRpcInfo* pRpc = pCtx->pTransInst; SRpcMsg rpcMsg; rpcMsg.pCont = conn->readBuf.buf; rpcMsg.contLen = conn->readBuf.len; rpcMsg.ahandle = pCtx->ahandle; (pRpc->cfp)(NULL, &rpcMsg, NULL); + conn->notifyCount += 1; SCliThrdObj* pThrd = conn->hostThrd; addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + + // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } - free(pCtx->ip); - free(pCtx); - // impl + destroyTransConnCtx(pCtx); +} +static void clientHandleExcept(SCliConn* pConn) { + SCliMsg* pMsg = pConn->data; + + STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pRpc = pCtx->pTransInst; + + SRpcMsg rpcMsg; + rpcMsg.ahandle = pCtx->ahandle; + // SRpcInfo* pRpc = pMsg->ctx->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, NULL); + + pConn->notifyCount += 1; + destroyTransConnCtx(pCtx); + clientConnDestroy(pConn, true); } static void clientTimeoutCb(uv_timer_t* handle) { @@ -191,6 +211,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->notifyCount = 0; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -246,19 +267,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - tDebug("alread read complete"); + tDebug("conn read complete"); clientHandleResp(conn); } else { - tDebug("read half packet, continue to read"); + tDebug("conn read half packet, continue to read"); } return; } assert(nread <= 0); if (nread == 0) { + tError("conn closed"); return; } - if (nread != UV_EOF) { - tDebug("read error %s", uv_err_name(nread)); + if (nread < 0) { + tError("conn read error: %s", uv_err_name(nread)); + clientHandleExcept(conn); } // tDebug("Read error %s\n", uv_err_name(nread)); // uv_close((uv_handle_t*)handle, clientDestroy); @@ -282,19 +305,20 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + if (status == 0) { - tDebug("data already was written on stream"); + tDebug("conn data already was written out"); } else { tError("failed to write: %s", uv_err_name(status)); - clientConnDestroy(pConn, true); + clientHandleExcept(pConn); return; } SCliThrdObj* pThrd = pConn->hostThrd; - if (pConn->stream == NULL) { - pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); - pConn->stream->data = pConn; - } + // if (pConn->stream == NULL) { + // pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); + // pConn->stream->data = pConn; + //} uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); // impl later } @@ -316,23 +340,10 @@ static void clientWrite(SCliConn* pConn) { static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; - SCliMsg* pMsg = pConn->data; - - STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pRpc; - if (status != 0) { // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); tError("failed to connect server, errmsg: %s", uv_strerror(status)); - // call user fp later - SRpcMsg rpcMsg; - rpcMsg.ahandle = pCtx->ahandle; - // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pRpc->cfp)(NULL, &rpcMsg, NULL); - - clientConnDestroy(pConn, true); - // uv_close((uv_handle_t*)req->handle, clientDestroy); - return; + clientHandleExcept(pConn); } assert(pConn->stream == req->handle); @@ -462,6 +473,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { free(pThrd->loop); free(pThrd); } + +static void destroyTransConnCtx(STransConnCtx* ctx) { + if (ctx != NULL) { + free(ctx->ip); + } + free(ctx); +} // void taosCloseClient(void* arg) { // impl later @@ -472,7 +490,6 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } - void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]); @@ -487,7 +504,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pRpc = (SRpcInfo*)shandle; + pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index b519a35f2491f4041263de8a03aaa1d7f976a71b..3b7b4d6fea0c6209aad6580e28a8f7eb57c65cff 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -16,6 +16,7 @@ #ifdef USE_UV #include "transComm.h" + typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -226,7 +227,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvProcessData(SConn* pConn) { +static void uvHandleReq(SConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; SConnBuffer* pBuf = &pConn->connBuf; @@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; - tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread); + tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); if (readComplete(pBuf)) { - tDebug("alread read complete packet"); - uvProcessData(conn); + tDebug("conn %p alread read complete packet", conn); + uvHandleReq(conn); } else { - tDebug("read half packet, continue to read"); + tDebug("conn %p read partial packet, continue to read", conn); } return; } if (nread == 0) { + tDebug("conn %p except read", conn); + // destroyConn(conn, true); return; } if (nread != UV_EOF) { - tDebug("read error %s", uv_err_name(nread)); + tDebug("conn %p read error: %s", conn, uv_err_name(nread)); + destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b void uvOnTimeoutCb(uv_timer_t* handle) { // opt - tDebug("time out"); + SConn* pConn = handle->data; + tDebug("conn %p time out", pConn); } void uvOnWriteCb(uv_write_t* req, int status) { @@ -317,9 +322,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { memset(buf->buf, 0, buf->cap); buf->left = -1; if (status == 0) { - tDebug("data already was written on stream"); + tDebug("conn %p data already was written on stream", conn); } else { - tDebug("failed to write data, %s", uv_err_name(status)); + tDebug("conn %p failed to write data, %s", conn, uv_err_name(status)); destroyConn(conn, true); } // opt @@ -334,7 +339,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { // impl later; - tDebug("prepare to send back"); + tDebug("conn %p prepare to send resp", conn); SRpcMsg* pMsg = &conn->sendMsg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); @@ -448,7 +453,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); - tDebug("new connection created: %d", fd); + tDebug("conn %p created, fd: %d", pConn, fd); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { tDebug("failed to create new connection"); @@ -517,17 +522,13 @@ static SConn* createConn() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); return pConn; } -static void connCloseCb(uv_handle_t* handle) { - // impl later - // -} + static void destroyConn(SConn* conn, bool clear) { if (conn == NULL) { return; } if (clear) { - uv_handle_t handle = *((uv_handle_t*)conn->pTcp); - uv_close(&handle, NULL); + uv_close((uv_handle_t*)conn->pTcp, NULL); } uv_timer_stop(conn->pTimer); free(conn->pTimer); @@ -646,6 +647,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { pthread_mutex_lock(&pThrd->connMtx); QUEUE_PUSH(&pThrd->conn, &pConn->queue); pthread_mutex_unlock(&pThrd->connMtx); + tDebug("conn %p start to send resp", pConn); uv_async_send(pConn->pWorkerAsync); }