From 5941437be1beff0ef47193c217607baa40b3f2f9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 22 Feb 2022 12:51:34 +0800 Subject: [PATCH] refactor code --- .../dnode/mgmt/impl/test/sut/src/client.cpp | 5 ++- source/libs/transport/inc/transComm.h | 9 ++-- source/libs/transport/src/transCli.c | 30 +------------ source/libs/transport/src/transComm.c | 22 ++++++---- source/libs/transport/src/transSrv.c | 42 +------------------ 5 files changed, 26 insertions(+), 82 deletions(-) diff --git a/source/dnode/mgmt/impl/test/sut/src/client.cpp b/source/dnode/mgmt/impl/test/sut/src/client.cpp index 589c015013..b89cb02834 100644 --- a/source/dnode/mgmt/impl/test/sut/src/client.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/client.cpp @@ -24,6 +24,9 @@ static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { } void TestClient::SetRpcRsp(SRpcMsg* rsp) { + if (this->pRsp) { + free(this->pRsp); + } this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg)); this->pRsp->msgType = rsp->msgType; this->pRsp->code = rsp->code; @@ -60,7 +63,7 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint strcpy(this->user, user); strcpy(this->pass, pass); this->port = port; - // this->pRsp = NULL; + this->pRsp = NULL; this->DoInit(); return true; } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2078a218ee..d4d9bff56c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -238,10 +238,11 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool); int transSendAsync(SAsyncPool* pool, queue* mq); -int transInitBuffer(SConnBuffer* buf); -int transClearBuffer(SConnBuffer* buf); -int transDestroyBuffer(SConnBuffer* buf); -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +int transInitBuffer(SConnBuffer* buf); +int transClearBuffer(SConnBuffer* buf); +int transDestroyBuffer(SConnBuffer* buf); +int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +bool transReadComplete(SConnBuffer* connBuf); // int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c70cd933d5..8312c0217c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,8 +84,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); -// check whether already read complete packet from server -static bool clientReadComplete(SConnBuffer* pBuf); // alloc buf for read static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket @@ -309,32 +307,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); } -static bool clientReadComplete(SConnBuffer* data) { - if (data->len >= sizeof(STransMsgHead)) { - STransMsgHead head; - memcpy((char*)&head, data->buf, sizeof(head)); - int32_t msgLen = (int32_t)htonl(head.msgLen); - data->total = msgLen; - } - - if (data->len == data->cap && data->total == data->cap) { - return true; - } - return false; - // if (data->len >= headLen) { - // memcpy((char*)&head, data->buf, headLen); - // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - // if (msgLen > data->len) { - // data->left = msgLen - data->len; - // return false; - // } else if (msgLen == data->len) { - // data->left = 0; - // return true; - // } - //} else { - // return false; - //} -} static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -349,7 +321,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - if (clientReadComplete(pBuf)) { + if (transReadComplete(pBuf)) { tTrace("client conn %p read complete", conn); clientHandleResp(conn); } else { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 388e0da4e0..9a8607b0ed 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -222,23 +222,31 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { p->buf = (char*)calloc(CAPACITY, sizeof(char)); p->len = 0; p->cap = CAPACITY; - p->total = 0; + p->total = -1; uvBuf->base = p->buf; uvBuf->len = CAPACITY; } else { - STransMsgHead head; - memcpy((char*)&head, p->buf, sizeof(head)); - int32_t msgLen = (int32_t)htonl(head.msgLen); - - p->total = msgLen; - p->cap = msgLen; + p->cap = p->total; p->buf = realloc(p->buf, p->cap); uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; } return 0; } +// check whether already read complete +bool transReadComplete(SConnBuffer* connBuf) { + if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) { + STransMsgHead head; + memcpy((char*)&head, connBuf->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + connBuf->total = msgLen; + } + if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { + return true; + } + return false; +} int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {} int transUnpackMsg(STransMsgHead* msgHead) {} diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a038f72adc..9fca371bf3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -104,7 +104,6 @@ static void uvStartSendResp(SSrvMsg* msg); static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet -static bool readComplete(SConnBuffer* buf); static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); @@ -124,45 +123,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b transAllocBuffer(pBuf, buf); } -// check data read from socket complete or not -// -static bool readComplete(SConnBuffer* data) { - // TODO(yihao): handle pipeline later - if (data->len == data->cap && data->total == data->cap) { - return true; - } - return false; - // STransMsgHead head; - // int32_t headLen = sizeof(head); - // if (data->len >= headLen) { - // memcpy((char*)&head, data->buf, headLen); - // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - // if (msgLen > data->len) { - // data->left = msgLen - data->len; - // return false; - // } else if (msgLen == data->len) { - // return true; - // } else if (msgLen < data->len) { - // return false; - // // handle other packet later - // } - //} else { - // return false; - //} -} - -// static void uvDoProcess(SRecvInfo* pRecv) { -// // impl later -// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; -// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; -// SSrvConn* pConn = pRecv->thandle; -// tDump(pRecv->msg, pRecv->msgLen); -// terrno = 0; -// // SRpcReqContext* pContest; -// -// // do auth and check -//} - static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -283,7 +243,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); - if (readComplete(pBuf)) { + if (transReadComplete(pBuf)) { tTrace("server conn %p alread read complete packet", conn); uvHandleReq(conn); } else { -- GitLab