提交 7142f133 编写于 作者: dengyihao's avatar dengyihao

refactor rpc code

上级 f65d33ea
...@@ -317,6 +317,11 @@ typedef struct STransReq { ...@@ -317,6 +317,11 @@ typedef struct STransReq {
void* data; void* data;
} STransReq; } STransReq;
void transReqQueueInit(queue* q);
void* transReqQueuePushReq(queue* q);
void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q);
// queue sending msgs // queue sending msgs
typedef struct { typedef struct {
SArray* q; SArray* q;
......
...@@ -19,7 +19,7 @@ typedef struct SCliConn { ...@@ -19,7 +19,7 @@ typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
uv_write_t writeReq; queue wreqQueue;
void* hostThrd; void* hostThrd;
...@@ -586,9 +586,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -586,9 +586,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
conn->writeReq.data = conn;
conn->connReq.data = conn; conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
...@@ -627,6 +628,8 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -627,6 +628,8 @@ static void cliDestroy(uv_handle_t* handle) {
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
transQueueDestroy(&conn->cliMsgs); transQueueDestroy(&conn->cliMsgs);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue);
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
taosMemoryFree(conn); taosMemoryFree(conn);
} }
...@@ -649,11 +652,8 @@ static bool cliHandleNoResp(SCliConn* conn) { ...@@ -649,11 +652,8 @@ static bool cliHandleNoResp(SCliConn* conn) {
return res; return res;
} }
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req && req->handle ? req->handle->data : NULL; SCliConn* pConn = transReqQueueRemove(req);
taosMemoryFree(req); if (pConn == NULL) return;
if (pConn == NULL) {
return;
}
if (status == 0) { if (status == 0) {
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
...@@ -711,7 +711,7 @@ void cliSend(SCliConn* pConn) { ...@@ -711,7 +711,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
......
...@@ -293,6 +293,48 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { ...@@ -293,6 +293,48 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
return ret; return ret;
} }
void transReqQueueInit(queue* q) {
// init req queue
QUEUE_INIT(q);
}
void* transReqQueuePushReq(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
req->data = wreq;
QUEUE_PUSH(q, &wreq->q);
return req;
}
void* transReqQueueRemove(void* arg) {
void* ret = NULL;
uv_write_t* req = arg;
STransReq* wreq = req && req->data ? req->data : NULL;
assert(wreq->data == req);
if (wreq == NULL || wreq->data == NULL) {
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
return req;
}
QUEUE_REMOVE(&wreq->q);
ret = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
return ret;
}
void transReqQueueClear(queue* q) {
while (!QUEUE_IS_EMPTY(q)) {
queue* h = QUEUE_HEAD(q);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req);
}
}
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*)); queue->q = taosArrayInit(2, sizeof(void*));
queue->freeFunc = (void (*)(const void*))freeFunc; queue->freeFunc = (void (*)(const void*))freeFunc;
......
...@@ -331,14 +331,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { ...@@ -331,14 +331,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
STransReq* wreq = req && req->data ? req->data : NULL; SSvrConn* conn = transReqQueueRemove(req);
SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
if (wreq != NULL && conn != NULL) {
QUEUE_REMOVE(&wreq->q);
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
}
if (conn == NULL) return; if (conn == NULL) return;
if (status == 0) { if (status == 0) {
...@@ -442,12 +435,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { ...@@ -442,12 +435,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
req->data = wreq;
QUEUE_PUSH(&pConn->wreqQueue, &wreq->q);
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
...@@ -757,7 +745,7 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -757,7 +745,7 @@ static SSvrConn* createConn(void* hThrd) {
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
QUEUE_INIT(&pConn->wreqQueue); transReqQueueInit(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
...@@ -834,13 +822,7 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -834,13 +822,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
destroySmsg(msg); destroySmsg(msg);
} }
while (!QUEUE_IS_EMPTY(&conn->wreqQueue)) { transReqQueueClear(&conn->wreqQueue);
queue* h = QUEUE_HEAD(&conn->wreqQueue);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req);
}
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册