From 3698980392921aa9f2e152f26bab635e5906a769 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 22 Jan 2022 18:45:03 +0800 Subject: [PATCH] refactor code and test reading half packet --- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 71 ++++++++++++++++------------ source/libs/transport/src/transSrv.c | 48 +++++++++++-------- source/libs/transport/test/rclient.c | 8 ++-- source/libs/transport/test/rserver.c | 2 +- 5 files changed, 76 insertions(+), 54 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4b490936cc..bc9a9de318 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -32,6 +32,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->connType = pInit->connType; + pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ffd8d35bfc..bfadfe56ef 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,7 +17,7 @@ #include "transComm.h" -#define CONN_PERSIST_TIME(para) (para * 1000 * 100) +#define CONN_PERSIST_TIME(para) (para * 1000 * 10) typedef struct SCliConn { uv_connect_t connReq; @@ -65,15 +65,15 @@ typedef struct SConnList { // conn pool // add expire timeout and capacity limit -static void* connPoolCreate(int size); -static void* connPoolDestroy(void* pool); +static void* creatConnPool(int size); +static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); // process data read from server, auth/decompress etc later -static void clientProcessData(SCliConn* conn); +static void clientHandleResp(SCliConn* conn); // check whether already read complete packet from server static bool clientReadComplete(SConnBuffer* pBuf); // alloc buf for read @@ -86,9 +86,11 @@ static void clientWriteCb(uv_write_t* req, int status); static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); -static void clientConnDestroy(SCliConn* pConn); +static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void clientMsgDestroy(SCliMsg* pMsg); +// handle req from app +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); // thread obj static SCliThrdObj* createThrdObj(); @@ -96,9 +98,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); // thread static void* clientThread(void* arg); -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); - -static void clientProcessData(SCliConn* conn) { +static void clientHandleResp(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; SRpcInfo* pRpc = pCtx->pRpc; SRpcMsg rpcMsg; @@ -131,7 +131,9 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliConn* c = QUEUE_DATA(h, SCliConn, conn); if (c->expireTime < currentTime) { QUEUE_REMOVE(h); - clientConnDestroy(c); + // uv_stream_t stm = *(c->stream); + // uv_close((uv_handle_t*)&stm, clientDestroy); + clientConnDestroy(c, true); } else { break; } @@ -142,18 +144,18 @@ static void clientTimeoutCb(uv_timer_t* handle) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* connPoolCreate(int size) { - SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - return pool; +static void* creatConnPool(int size) { + // thread local, no lock + return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } -static void* connPoolDestroy(void* pool) { +static void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); QUEUE_REMOVE(h); SCliConn* c = QUEUE_DATA(h, SCliConn, conn); - clientConnDestroy(c); + clientConnDestroy(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); } @@ -245,28 +247,37 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf pBuf->len += nread; if (clientReadComplete(pBuf)) { tDebug("alread read complete"); - clientProcessData(conn); + clientHandleResp(conn); } else { - tDebug("read halp packet, continue to read"); + tDebug("read half packet, continue to read"); } return; } - + assert(nread <= 0); + if (nread == 0) { + return; + } if (nread != UV_EOF) { - tDebug("Read error %s\n", uv_err_name(nread)); + tDebug("read error %s", uv_err_name(nread)); } - // - uv_close((uv_handle_t*)handle, clientDestroy); + // tDebug("Read error %s\n", uv_err_name(nread)); + // uv_close((uv_handle_t*)handle, clientDestroy); } -static void clientConnDestroy(SCliConn* conn) { - // impl later - // +static void clientConnDestroy(SCliConn* conn, bool clear) { + tDebug("conn %p destroy", conn); + if (clear) { + uv_close((uv_handle_t*)conn->stream, NULL); + } + free(conn->stream); + free(conn->readBuf.buf); + free(conn->writeReq); + free(conn); } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - QUEUE_REMOVE(&conn->conn); - clientConnDestroy(conn); + // QUEUE_REMOVE(&conn->conn); + clientConnDestroy(conn, false); } static void clientWriteCb(uv_write_t* req, int status) { @@ -274,7 +285,8 @@ static void clientWriteCb(uv_write_t* req, int status) { if (status == 0) { tDebug("data already was written on stream"); } else { - uv_close((uv_handle_t*)pConn->stream, clientDestroy); + tError("failed to write: %s", uv_err_name(status)); + clientConnDestroy(pConn, true); return; } SCliThrdObj* pThrd = pConn->hostThrd; @@ -317,7 +329,9 @@ static void clientConnCb(uv_connect_t* req, int status) { rpcMsg.ahandle = pCtx->ahandle; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pRpc->cfp)(NULL, &rpcMsg, NULL); - uv_close((uv_handle_t*)req->handle, clientDestroy); + + clientConnDestroy(pConn, true); + // uv_close((uv_handle_t*)req->handle, clientDestroy); return; } @@ -421,7 +435,6 @@ static void clientMsgDestroy(SCliMsg* pMsg) { } static SCliThrdObj* createThrdObj() { SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); - QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); @@ -436,7 +449,7 @@ static SCliThrdObj* createThrdObj() { uv_timer_init(pThrd->loop, pThrd->pTimer); pThrd->pTimer->data = pThrd; - pThrd->pool = connPoolCreate(1); + pThrd->pool = creatConnPool(1); return pThrd; } static void destroyThrdObj(SCliThrdObj* pThrd) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index dafb809e2a..b519a35f24 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -14,8 +14,8 @@ */ #ifdef USE_UV -#include "transComm.h" +#include "transComm.h" typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -84,12 +84,12 @@ static void uvWorkerAsyncCb(uv_async_t* handle); static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); -// already read complete packet -static bool readComplete(SConnBuffer* buf); +// check whether already read complete packet +static bool readComplete(SConnBuffer* buf); +static SConn* createConn(); +static void destroyConn(SConn* conn, bool clear /*clear handle or not*/); -static SConn* connCreate(); -static void connDestroy(SConn* conn); -static void uvConnDestroy(uv_handle_t* handle); +static void uvDestroyConn(uv_handle_t* handle); // server and worker thread static void* workerThread(void* arg); @@ -283,6 +283,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; + tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread); if (readComplete(pBuf)) { tDebug("alread read complete packet"); uvProcessData(conn); @@ -291,11 +292,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } + if (nread == 0) { + return; + } if (nread != UV_EOF) { - tDebug("Read error %s", uv_err_name(nread)); + tDebug("read error %s", uv_err_name(nread)); } - tDebug("read error %s", uv_err_name(nread)); - uv_close((uv_handle_t*)cli, uvConnDestroy); } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = malloc(sizeof(char)); @@ -318,7 +320,7 @@ void uvOnWriteCb(uv_write_t* req, int status) { tDebug("data already was written on stream"); } else { tDebug("failed to write data, %s", uv_err_name(status)); - connDestroy(conn); + destroyConn(conn, true); } // opt } @@ -331,7 +333,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { - // impl later + // impl later; + tDebug("prepare to send back"); SRpcMsg* pMsg = &conn->sendMsg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); @@ -394,6 +397,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); + free(cli); } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -403,7 +407,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("read error %s", uv_err_name(nread)); } // TODO(log other failure reason) - uv_close((uv_handle_t*)q, NULL); + // uv_close((uv_handle_t*)q, NULL); return; } // free memory allocated by @@ -422,7 +426,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SConn* pConn = connCreate(); + SConn* pConn = createConn(); pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); @@ -448,7 +452,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { tDebug("failed to create new connection"); - connDestroy(pConn); + destroyConn(pConn, true); } } @@ -509,7 +513,7 @@ void* workerThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -static SConn* connCreate() { +static SConn* createConn() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); return pConn; } @@ -517,22 +521,24 @@ static void connCloseCb(uv_handle_t* handle) { // impl later // } -static void connDestroy(SConn* conn) { +static void destroyConn(SConn* conn, bool clear) { if (conn == NULL) { return; } + if (clear) { + uv_handle_t handle = *((uv_handle_t*)conn->pTcp); + uv_close(&handle, NULL); + } uv_timer_stop(conn->pTimer); free(conn->pTimer); - // uv_close((uv_handle_t*)conn->pTcp, connCloseCb); free(conn->pTcp); free(conn->connBuf.buf); free(conn->pWriter); - // free(conn); - // handle + free(conn); } -static void uvConnDestroy(uv_handle_t* handle) { +static void uvDestroyConn(uv_handle_t* handle) { SConn* conn = handle->data; - connDestroy(conn); + destroyConn(conn, false); } static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) { STransMsgHead* pHead = (STransMsgHead*)msg; diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 4ccbb60cc2..fd3496cc17 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,8 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - // tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - // pMsg->code); + tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -63,6 +63,8 @@ static void *sendRequest(void *param) { if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); + tDebug("recv response"); + // usleep(100000000); } tDebug("thread:%d, it is over", pInfo->index); @@ -98,7 +100,7 @@ int main(int argc, char *argv[]) { rpcInit.numOfThreads = 1; rpcInit.cfp = processResponse; rpcInit.sessions = 100; - rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.idleTime = 100; rpcInit.user = "michael"; rpcInit.secret = secret; rpcInit.ckey = "key"; diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 2e32aa57ca..12d8a01819 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -122,7 +122,7 @@ int main(int argc, char *argv[]) { rpcInit.numOfThreads = 1; rpcInit.cfp = processRequestMsg; rpcInit.sessions = 1000; - rpcInit.idleTime = tsShellActivityTimer * 1500; + rpcInit.idleTime = 2 * 1500; rpcInit.afp = retrieveAuthInfo; for (int i = 1; i < argc; ++i) { -- GitLab