diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 506b085ecd1c6f870ef733aa66c8a16748643d08..c760acd52e80443d2a4bb7b5874ce7258e687798 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -199,4 +199,13 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); +void transConnCtxDestroy(STransConnCtx* ctx); + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 48398316f1ea19c24bbbfb4e551ff632a621bc6b..cf1e1539654347933898860548cd984cd188d1a9 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -17,13 +17,6 @@ #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { taosInitServer, taosInitClient}; void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; @@ -46,10 +39,11 @@ void* rpcOpen(const SRpcInit* pInit) { void rpcClose(void* arg) { SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); + free(pRpc); return; } void* rpcMallocCont(int contLen) { - int size = contLen + RPC_MSG_OVERHEAD; + int size = contLen + TRANS_MSG_OVERHEAD; char* start = (char*)calloc(1, (size_t)size); if (start == NULL) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8622db9b3f04887acb44bf96f9f0a5f9b95337f5..86e9c05ccb541f17261d4aec80bd007e36cd34db 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,6 +21,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; + SConnBuffer readBuf; void* data; queue conn; char spi; @@ -55,28 +56,92 @@ typedef struct SClientObj { static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// process data read from server, auth/decompress etc +static void clientProcessData(SCliConn* conn); +// check whether already read complete packet from server +static bool clientReadComplete(SConnBuffer* pBuf); +// alloc buf for read static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +// callback after read nbytes from socket static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +// callback after write data to socket static void clientWriteCb(uv_write_t* req, int status); +// callback after conn to server static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn); +static void clientMsgDestroy(SCliMsg* pMsg); + static void* clientThread(void* arg); +static void clientProcessData(SCliConn* conn) { + // impl +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static bool clientReadComplete(SConnBuffer* data) { + STransMsgHead head; + int32_t headLen = sizeof(head); + if (data->len >= headLen) { + memcpy((char*)&head, data->buf, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { // impl later + static const int CAPACITY = 512; + + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + buf->base = pBuf->buf; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); + } + } + buf->base = pBuf->buf + pBuf->len; + buf->len = pBuf->cap - pBuf->len; + } } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later - SCliConn* conn = handle->data; + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { + pBuf->len += nread; + if (clientReadComplete(pBuf)) { + tDebug("alread read complete pack"); + clientProcessData(conn); + } else { + tDebug("read halp packet, continue to read"); + } return; } + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } // uv_close((uv_handle_t*)handle, clientDestroy); } @@ -164,7 +229,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->writeReq->data = conn; clientWrite(conn); } else { - SCliConn* conn = malloc(sizeof(SCliConn)); + SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); @@ -235,9 +300,23 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return cli; } +static void clientMsgDestroy(SCliMsg* pMsg) { + // impl later + free(pMsg); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* pThrd = cli->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); + free(pThrd->loop); + free(pThrd); + } + free(cli->pThreadObj); + free(cli); } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { @@ -247,19 +326,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SRpcInfo* pRpc = (SRpcInfo*)shandle; - int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + int32_t flen = 0; + if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { + // imp later + } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->pRpc = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; - // pContext->contLen = len; - // pContext->pCont = pMsg->pCont; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; - // pContext->epSet = *pEpSet; - // pContext->oldInUse = pEpSet->inUse; assert(pRpc->connType == TAOS_CONN_CLIENT); // atomic or not diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 27cff80586e280a5d7ae5fba1e6dedf47bf5dc07..617abeea3953b120a876763020e1e7ffcb035307 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { + return false; // SRpcHead* pHead = rpcHeadFromCont(pCont); bool succ = false; int overhead = sizeof(STransCompMsg); @@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { return pHead; } +void transConnCtxDestroy(STransConnCtx* ctx) { + free(ctx->ip); + free(ctx); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 46caffd93c35cebc9a420e42e14b0238d49170c7..d096ab78139efaf9e613ddaf7be1bc63b4689192 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -16,13 +16,6 @@ #ifdef USE_UV #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -100,7 +93,8 @@ static void* acceptThread(void* arg); void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: - * |<-------SRpcReqContext------->|<------------data read from socket----------->| + * |<--------------------------data from socket------------------------------->| + * |<------STransMsgHead------->|<-------------------other data--------------->| */ static const int CAPACITY = 1024; @@ -133,7 +127,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - // SRpcHead rpcHead; STransMsgHead head; int32_t headLen = sizeof(head); if (data->len >= headLen) { @@ -270,13 +263,13 @@ static void uvProcessData(SConn* pConn) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SConn* ctx = cli->data; - SConnBuffer* pBuf = &ctx->connBuf; + SConn* conn = cli->data; + SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; if (readComplete(pBuf)) { tDebug("alread read complete packet"); - uvProcessData(ctx); + uvProcessData(conn); } else { tDebug("read half packet, continue to read"); } @@ -496,6 +489,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { @@ -530,6 +524,18 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* pThrd = srv->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + free(srv->pipe[i]); + free(pThrd->loop); + free(pThrd); + } + free(srv->loop); + free(srv->pipe); + free(srv->pThreadObj); + pthread_join(srv->thread, NULL); + free(srv); } void rpcSendResponse(const SRpcMsg* pMsg) {