提交 f65d33ea 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 60a9b2f5
...@@ -311,6 +311,12 @@ void transCtxMerge(STransCtx* dst, STransCtx* src); ...@@ -311,6 +311,12 @@ void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key); void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list
typedef struct STransReq {
queue q;
void* data;
} STransReq;
// queue sending msgs // queue sending msgs
typedef struct { typedef struct {
SArray* q; SArray* q;
......
...@@ -29,7 +29,7 @@ typedef struct { ...@@ -29,7 +29,7 @@ typedef struct {
typedef struct SSvrConn { typedef struct SSvrConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t pWriter; queue wreqQueue;
uv_timer_t pTimer; uv_timer_t pTimer;
queue queue; queue queue;
...@@ -331,8 +331,14 @@ void uvOnTimeoutCb(uv_timer_t* handle) { ...@@ -331,8 +331,14 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSvrConn* conn = req && req->handle ? req->handle->data : NULL; STransReq* wreq = req && req->data ? req->data : NULL;
taosMemoryFree(req); SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
if (wreq != NULL && conn != NULL) {
QUEUE_REMOVE(&wreq->q);
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
}
if (conn == NULL) return; if (conn == NULL) return;
if (status == 0) { if (status == 0) {
...@@ -437,12 +443,16 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { ...@@ -437,12 +443,16 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
req->data = wreq;
QUEUE_PUSH(&pConn->wreqQueue, &wreq->q);
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
// impl // impl
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (pConn->broken == true) { if (pConn->broken == true) {
// persist by // persist by
transFreeMsg(smsg->msg.pCont); transFreeMsg(smsg->msg.pCont);
...@@ -639,8 +649,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -639,8 +649,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init(pThrd->loop, pConn->pTcp); uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn; pConn->pTcp->data = pConn;
pConn->pWriter.data = pConn;
transSetConnOption((uv_tcp_t*)pConn->pTcp); transSetConnOption((uv_tcp_t*)pConn->pTcp);
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
...@@ -748,6 +756,8 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -748,6 +756,8 @@ static SSvrConn* createConn(void* hThrd) {
SWorkThrd* pThrd = hThrd; SWorkThrd* pThrd = hThrd;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
QUEUE_INIT(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
...@@ -823,6 +833,14 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -823,6 +833,14 @@ static void uvDestroyConn(uv_handle_t* handle) {
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
destroySmsg(msg); destroySmsg(msg);
} }
while (!QUEUE_IS_EMPTY(&conn->wreqQueue)) {
queue* h = QUEUE_HEAD(&conn->wreqQueue);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req);
}
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册