提交 5f51fb3a 编写于 作者: dengyihao's avatar dengyihao

modify transport

上级 b2c24a03
......@@ -65,6 +65,7 @@ typedef struct SWorkThrdObj {
queue conn;
pthread_mutex_t msgMtx;
void* pTransInst;
bool stop;
} SWorkThrdObj;
typedef struct SServerObj {
......@@ -386,7 +387,6 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
transUnrefSrvHandle(c);
// destroyConn(c, true);
}
}
void uvWorkerAsyncCb(uv_async_t* handle) {
......@@ -411,10 +411,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
}
if (msg->pConn == NULL) {
free(msg);
destroyAllConn(pThrd);
uv_loop_close(pThrd->loop);
uv_stop(pThrd->loop);
bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
if (noConn == true) {
uv_loop_close(pThrd->loop);
uv_stop(pThrd->loop);
} else {
destroyAllConn(pThrd);
uv_loop_close(pThrd->loop);
pThrd->stop = true;
}
} else {
uvStartSendResp(msg);
}
......@@ -422,12 +427,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
}
static void uvAcceptAsyncCb(uv_async_t* async) {
SServerObj* srv = async->data;
tDebug("close server port %d", srv->port);
uv_close((uv_handle_t*)&srv->server, NULL);
uv_stop(srv->loop);
}
static void uvShutDownCb(uv_shutdown_t* req, int status) {
tDebug("conn failed to shut down: %s", uv_err_name(status));
if (status != 0) {
tDebug("conn failed to shut down: %s", uv_err_name(status));
}
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
free(req);
}
......@@ -509,14 +517,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
int addrlen = sizeof(pConn->addr);
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
tError("server conn %p failed to get peer info", pConn);
destroyConn(pConn, true);
transUnrefSrvHandle(pConn);
return;
}
addrlen = sizeof(pConn->locaddr);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
tError("server conn %p failed to get local info", pConn);
destroyConn(pConn, true);
transUnrefSrvHandle(pConn);
return;
}
......@@ -524,7 +532,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
} else {
tDebug("failed to create new connection");
destroyConn(pConn, true);
transUnrefSrvHandle(pConn);
}
}
......@@ -602,6 +610,7 @@ static SSrvConn* createConn(void* hThrd) {
tTrace("conn %p created", pConn);
pConn->broken = false;
transRefSrvHandle(pConn);
return pConn;
}
......@@ -617,25 +626,26 @@ static void destroyConn(SSrvConn* conn, bool clear) {
destroySmsg(msg);
}
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
QUEUE_REMOVE(&conn->queue);
if (clear) {
tTrace("try to destroy conn %p", conn);
// uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
// uv_unref((uv_handle_t*)conn->pTcp);
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
}
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
SSrvConn* conn = handle->data;
SWorkThrdObj* thrd = conn->hostThrd;
tDebug("server conn %p destroy", conn);
uv_timer_stop(conn->pTimer);
// free(conn->pTimer);
QUEUE_REMOVE(&conn->queue);
free(conn->pTcp);
free(conn->pWriter);
free(conn);
if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) {
uv_stop(thrd->loop);
}
}
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg;
......@@ -670,6 +680,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
thrd->stop = false;
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
......@@ -767,6 +778,7 @@ void transUnrefSrvHandle(void* handle) {
return;
}
int ref = T_REF_DEC((SSrvConn*)handle);
tDebug("handle %p ref count: %d", handle, ref);
if (ref == 0) {
destroyConn((SSrvConn*)handle, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册