From 41c3160daaef575c209967ec40517361aa649764 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 16 Jan 2022 21:57:42 +0800 Subject: [PATCH] refactor code --- source/libs/transport/src/transport.c | 81 ++++++++++++++++++--------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c index f33d54c4f3..93bbaf2820 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transport.c @@ -135,12 +135,13 @@ typedef struct SRpcConn { uv_async_t* pWorkerAsync; queue queue; int ref; - int persist; // persist connection or not - SConnBuffer connBuf; + int persist; // persist connection or not + SConnBuffer connBuf; // read buf, + SConnBuffer writeBuf; // write buf int count; void* shandle; // rpc init - void* ahandle; - + void* ahandle; // + void* hostThread; // del later char secured; int spi; @@ -335,6 +336,11 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { return code; } +// refers specifically to query or insert timeout +static void uvHandleActivityTimeout(uv_timer_t* handle) { + // impl later + SRpcConn* conn = handle->data; +} static void uvProcessData(SRpcConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; @@ -358,8 +364,8 @@ static void uvProcessData(SRpcConn* pConn) { int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); if (code != 0) { terrno = code; + return; } - // rpcCheckAuthentication(pConn, (char*)pHead, pBuf->len); pHead->code = htonl(pHead->code); SRpcMsg rpcMsg; @@ -371,7 +377,9 @@ static void uvProcessData(SRpcConn* pConn) { rpcMsg.code = pHead->code; rpcMsg.ahandle = pConn->ahandle; rpcMsg.handle = pConn; + (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); + uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); // auth // validate msg type } @@ -419,21 +427,23 @@ void uvOnWriteCb(uv_write_t* req, int status) { } void uvWorkerAsyncCb(uv_async_t* handle) { - SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); + SThreadObj* pThrd = container_of(handle, SThreadObj, workerAsync); SRpcConn* conn = NULL; // opt later - pthread_mutex_lock(&pObj->connMtx); - if (!QUEUE_IS_EMPTY(&pObj->conn)) { - queue* head = QUEUE_HEAD(&pObj->conn); + pthread_mutex_lock(&pThrd->connMtx); + if (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* head = QUEUE_HEAD(&pThrd->conn); conn = QUEUE_DATA(head, SRpcConn, queue); QUEUE_REMOVE(&conn->queue); } - pthread_mutex_unlock(&pObj->connMtx); + pthread_mutex_unlock(&pThrd->connMtx); if (conn == NULL) { tError("except occurred, do nothing"); return; } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } void uvOnAcceptCb(uv_stream_t* stream, int status) { @@ -472,7 +482,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(buf->base[0] == notify[0]); free(buf->base); - SThreadObj* pObj = q->data; + SThreadObj* pThrd = q->data; uv_pipe_t* pipe = (uv_pipe_t*)q; if (!uv_pipe_pending_count(pipe)) { @@ -484,16 +494,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(pending == UV_TCP); SRpcConn* pConn = connCreate(); - pConn->shandle = pObj->shandle; + pConn->shandle = pThrd->shandle; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pObj->loop, pConn->pTimer); + uv_timer_init(pThrd->loop, pConn->pTimer); + pConn->pTimer->data = pConn; - pConn->pWorkerAsync = pObj->workerAsync; // thread safty + pConn->hostThread = pThrd; + pConn->pWorkerAsync = pThrd->workerAsync; // thread safty // init client handle pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pObj->loop, pConn->pTcp); + uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; // init write request, just @@ -527,23 +539,23 @@ void* acceptThread(void* arg) { uv_run(srv->loop, UV_RUN_DEFAULT); } void* workerThread(void* arg) { - SThreadObj* pObj = (SThreadObj*)arg; + SThreadObj* pThrd = (SThreadObj*)arg; - pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pObj->loop); + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); - uv_pipe_init(pObj->loop, pObj->pipe, 1); - uv_pipe_open(pObj->pipe, pObj->fd); + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); - pObj->pipe->data = pObj; + pThrd->pipe->data = pThrd; - QUEUE_INIT(&pObj->conn); + QUEUE_INIT(&pThrd->conn); - pObj->workerAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb); + pThrd->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); - uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb); - uv_run(pObj->loop, UV_RUN_DEFAULT); + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_run(pThrd->loop, UV_RUN_DEFAULT); } static SRpcConn* connCreate() { SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn)); @@ -583,9 +595,22 @@ void* rpcMallocCont(int contLen) { return NULL; } void rpcFreeCont(void* cont) { return; } void* rpcReallocCont(void* ptr, int contLen) { return NULL; } -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } +void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { + // impl later + return; +} + +void rpcSendResponse(const SRpcMsg* pMsg) { + SRpcConn* pConn = pMsg->handle; + SThreadObj* pThrd = pConn->hostThread; -void rpcSendResponse(const SRpcMsg* pMsg) {} + // opt later + pthread_mutex_lock(&pThrd->connMtx); + QUEUE_PUSH(&pThrd->conn, &pConn->queue); + pthread_mutex_unlock(&pThrd->connMtx); + + uv_async_send(pConn->pWorkerAsync); +} void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -- GitLab