diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 4d3a1b70cbd713bce8f06986882f50f46889faa9..c760acd52e80443d2a4bb7b5874ce7258e687798 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -201,4 +201,11 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transConnCtxDestroy(STransConnCtx* ctx); +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 48398316f1ea19c24bbbfb4e551ff632a621bc6b..cf1e1539654347933898860548cd984cd188d1a9 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -17,13 +17,6 @@ #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { taosInitServer, taosInitClient}; void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; @@ -46,10 +39,11 @@ void* rpcOpen(const SRpcInit* pInit) { void rpcClose(void* arg) { SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); + free(pRpc); return; } void* rpcMallocCont(int contLen) { - int size = contLen + RPC_MSG_OVERHEAD; + int size = contLen + TRANS_MSG_OVERHEAD; char* start = (char*)calloc(1, (size_t)size); if (start == NULL) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c96a0f81e05e01a40fc6c4d34b909d5b4beb6888..86e9c05ccb541f17261d4aec80bd007e36cd34db 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,6 +21,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; + SConnBuffer readBuf; void* data; queue conn; char spi; @@ -55,9 +56,17 @@ typedef struct SClientObj { static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// process data read from server, auth/decompress etc +static void clientProcessData(SCliConn* conn); +// check whether already read complete packet from server +static bool clientReadComplete(SConnBuffer* pBuf); +// alloc buf for read static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +// callback after read nbytes from socket static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +// callback after write data to socket static void clientWriteCb(uv_write_t* req, int status); +// callback after conn to server static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); @@ -67,18 +76,72 @@ static void clientMsgDestroy(SCliMsg* pMsg); static void* clientThread(void* arg); +static void clientProcessData(SCliConn* conn) { + // impl +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static bool clientReadComplete(SConnBuffer* data) { + STransMsgHead head; + int32_t headLen = sizeof(head); + if (data->len >= headLen) { + memcpy((char*)&head, data->buf, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { // impl later + static const int CAPACITY = 512; + + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + buf->base = pBuf->buf; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); + } + } + buf->base = pBuf->buf + pBuf->len; + buf->len = pBuf->cap - pBuf->len; + } } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later - SCliConn* conn = handle->data; + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { + pBuf->len += nread; + if (clientReadComplete(pBuf)) { + tDebug("alread read complete pack"); + clientProcessData(conn); + } else { + tDebug("read halp packet, continue to read"); + } return; } + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } // uv_close((uv_handle_t*)handle, clientDestroy); } @@ -166,7 +229,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->writeReq->data = conn; clientWrite(conn); } else { - SCliConn* conn = malloc(sizeof(SCliConn)); + SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); @@ -248,6 +311,7 @@ void taosCloseClient(void* arg) { SCliThrdObj* pThrd = cli->pThreadObj[i]; pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); free(pThrd->loop); free(pThrd); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f4625dc0b3ca2fccd58edcef6b80541a2aa97af8..d096ab78139efaf9e613ddaf7be1bc63b4689192 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -16,13 +16,6 @@ #ifdef USE_UV #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -100,7 +93,8 @@ static void* acceptThread(void* arg); void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: - * |<-------SRpcReqContext------->|<------------data read from socket----------->| + * |<--------------------------data from socket------------------------------->| + * |<------STransMsgHead------->|<-------------------other data--------------->| */ static const int CAPACITY = 1024; @@ -133,7 +127,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - // SRpcHead rpcHead; STransMsgHead head; int32_t headLen = sizeof(head); if (data->len >= headLen) { @@ -270,13 +263,13 @@ static void uvProcessData(SConn* pConn) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SConn* ctx = cli->data; - SConnBuffer* pBuf = &ctx->connBuf; + SConn* conn = cli->data; + SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; if (readComplete(pBuf)) { tDebug("alread read complete packet"); - uvProcessData(ctx); + uvProcessData(conn); } else { tDebug("read half packet, continue to read"); } @@ -542,6 +535,7 @@ void taosCloseServer(void* arg) { free(srv->pipe); free(srv->pThreadObj); pthread_join(srv->thread, NULL); + free(srv); } void rpcSendResponse(const SRpcMsg* pMsg) {