From fdb79077c4c2d1c945a234a66d123a93794329df Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 15 Jan 2022 18:20:53 +0800 Subject: [PATCH] add libuv --- source/libs/transport/src/rpcMain.c | 212 +++++++++++++--------------- 1 file changed, 100 insertions(+), 112 deletions(-) diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index a1c0c05fc3..37ef10ba5b 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -66,10 +66,31 @@ typedef struct { struct SRpcConn* connList; // connection list } SRpcInfo; +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + SEpSet* pSet; // for synchronous API + char msg[0]; // RpcHead starts from here +} SRpcReqContext; + #ifdef USE_UV #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) static const char* notify = "a"; typedef struct SThreadObj { @@ -94,12 +115,12 @@ typedef struct SServerObj { uint32_t port; } SServerObj; -typedef struct SContent { +typedef struct SConnBuffer { char* buf; int len; int cap; - int toRead; -} SContent; + int left; +} SConnBuffer; typedef struct SConnCtx { uv_tcp_t* pTcp; @@ -110,18 +131,18 @@ typedef struct SConnCtx { queue queue; int ref; int persist; // persist connection or not - SContent pCont; + SConnBuffer connBuf; int count; } SConnCtx; -static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); -static void onWrite(uv_write_t* req, int status); -static void onAccept(uv_stream_t* stream, int status); -static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void workerAsyncCB(uv_async_t* handle); +static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void uvOnTimeoutCb(uv_timer_t* handle); +static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnAcceptCb(uv_stream_t* stream, int status); +static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void uvWorkerAsyncCb(uv_async_t* handle); static SConnCtx* connCtxCreate(); static void connCtxDestroy(SConnCtx* ctx); @@ -193,95 +214,68 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } -void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } -void rpcFreeCont(void* cont) { return; } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } - -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } -void rpcSendResponse(const SRpcMsg* pMsg) {} +void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + static const int CAPACITY = 1024; + /* + * formate of data buffer: + * |<-------SRpcReqContext------->|<------------data read from socket----------->| + */ -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } + SConnCtx* ctx = handle->data; + SConnBuffer* pBuf = &ctx->connBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; -void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - static const int CAPACITY = 1024; - tDebug("pre alloc buffer for read "); - SConnCtx* ctx = handle->data; - SContent* pCont = &ctx->pCont; - if (pCont->cap == 0) { - pCont->buf = (char*)calloc(CAPACITY, sizeof(char)); - pCont->len = 0; - pCont->cap = CAPACITY; - pCont->toRead = -1; - - buf->base = pCont->buf; + buf->base = pBuf->buf + RPC_RESERVE_SIZE; buf->len = CAPACITY; } else { - if (pCont->len >= pCont->cap) { - if (pCont->toRead == -1) { - pCont->cap *= 2; - pCont->buf = realloc(pCont->buf, pCont->cap); - } else if (pCont->len + pCont->toRead > pCont->cap) { - pCont->cap = pCont->len + pCont->toRead; - pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead); + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); } } - buf->base = pCont->buf + pCont->len; - buf->len = pCont->cap - pCont->len; - } - - // if (ctx->pCont.cap == 0) { - // ctx->pCont.buf = (char*)calloc(64, sizeof(char)); - // ctx->pCont.len = 0; - // ctx->pCont.cap = 64; - // // - // buf->base = ctx->pCont.buf; - // buf->len = sz; - //} else { - // if (ctx->pCont.len + sz > ctx->pCont.cap) { - // ctx->pCont.cap *= 2; - // ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap); - // } - // buf->base = ctx->pCont.buf + ctx->pCont.len; - // buf->len = sz; - //} -} -// change later -static bool handleUserData(SContent* data) { + buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; + buf->len = pBuf->cap - pBuf->len; + } +} +// check data read from socket completely or not +// +static bool isReadAll(SConnBuffer* data) { + // TODO(yihao): handle pipeline later SRpcHead rpcHead; - - bool finish = false; - int32_t msgLen, leftLen, retLen; - int32_t headLen = sizeof(rpcHead); + int32_t headLen = sizeof(rpcHead); if (data->len >= headLen) { memcpy((char*)&rpcHead, data->buf, headLen); - msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - if (msgLen + headLen <= data->len) { - return true; - } else { + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; return false; + } else { + return true; } } else { return false; } } -void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { +void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SConnCtx* ctx = cli->data; - SContent* pCont = &ctx->pCont; + SConnCtx* ctx = cli->data; + SConnBuffer* pBuf = &ctx->connBuf; if (nread > 0) { - pCont->len += nread; - bool finish = handleUserData(pCont); - if (finish == false) { - tDebug("continue read"); + pBuf->len += nread; + if (isReadAll(pBuf)) { + tDebug("alread read complete packet"); } else { - tDebug("read completely"); + tDebug("read half packet, continue to read"); } return; } @@ -291,17 +285,17 @@ void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } uv_close((uv_handle_t*)cli, uvConnCtxDestroy); } -void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = malloc(sizeof(char)); buf->len = 2; } -void onTimeout(uv_timer_t* handle) { +void uvOnTimeoutCb(uv_timer_t* handle) { // opt tDebug("time out"); } -void onWrite(uv_write_t* req, int status) { +void uvOnWriteCb(uv_write_t* req, int status) { SConnCtx* ctx = req->data; if (status == 0) { tDebug("data already was written on stream"); @@ -311,7 +305,7 @@ void onWrite(uv_write_t* req, int status) { // opt } -void workerAsyncCB(uv_async_t* handle) { +void uvWorkerAsyncCb(uv_async_t* handle) { SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); SConnCtx* conn = NULL; @@ -329,7 +323,7 @@ void workerAsyncCB(uv_async_t* handle) { } } -void onAccept(uv_stream_t* stream, int status) { +void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { return; } @@ -345,12 +339,12 @@ void onAccept(uv_stream_t* stream, int status) { pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); - uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); } } -void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { +void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tDebug("connection coming"); if (nread < 0) { if (nread != UV_EOF) { @@ -396,7 +390,7 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); - uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { connCtxDestroy(pConn); } @@ -412,7 +406,7 @@ void* acceptThread(void* arg) { uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { tError("Listen error %s\n", uv_err_name(err)); return NULL; } @@ -430,9 +424,9 @@ void* workerThread(void* arg) { QUEUE_INIT(&pObj->conn); pObj->workerAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); + uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb); - uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection); + uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_run(pObj->loop, UV_RUN_DEFAULT); } static SConnCtx* connCtxCreate() { @@ -455,6 +449,20 @@ static void uvConnCtxDestroy(uv_handle_t* handle) { SConnCtx* ctx = handle->data; connCtxDestroy(ctx); } +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } + +void rpcSendResponse(const SRpcMsg* pMsg) {} + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } #else @@ -465,26 +473,6 @@ static void uvConnCtxDestroy(uv_handle_t* handle) { #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) -typedef struct { - SRpcInfo * pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void * ahandle; // handle provided by app - struct SRpcConn *pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t * pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg * pRsp; // for synchronous API - tsem_t * pSem; // for synchronous API - SEpSet * pSet; // for synchronous API - char msg[0]; // RpcHead starts from here -} SRpcReqContext; - typedef struct SRpcConn { char info[48]; // debug info: label + pConn + ahandle int sid; // session ID -- GitLab