提交 9913d0c7 编写于 作者: dengyihao's avatar dengyihao

fix except

上级 5262b4ad
......@@ -26,7 +26,7 @@ typedef struct SCliConn {
T_REF_DECLARE()
uv_connect_t connReq;
uv_stream_t* stream;
uv_write_t* writeReq;
uv_write_t writeReq;
void* hostThrd;
SConnBuffer readBuf;
void* data;
......@@ -34,12 +34,12 @@ typedef struct SCliConn {
uint64_t expireTime;
int8_t ctnRdCnt; // continue read count
int hThrdIdx;
bool broken; // link broken or not
int persist; //
// spi configure
char spi;
char secured;
int32_t ref;
char spi;
char secured;
// debug and log info
struct sockaddr_in addr;
struct sockaddr_in locaddr;
......@@ -54,11 +54,10 @@ typedef struct SCliMsg {
} SCliMsg;
typedef struct SCliThrdObj {
pthread_t thread;
uv_loop_t* loop;
// uv_async_t* cliAsync; //
pthread_t thread;
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_timer_t* timer;
uv_timer_t timer;
void* pool; // conn pool
// msg queue
......@@ -83,7 +82,7 @@ typedef struct SConnList {
// conn pool
// add expire timeout and capacity limit
static void* creatConnPool(int size);
static void* createConnPool(int size);
static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
......@@ -99,8 +98,10 @@ static void clientWriteCb(uv_write_t* req, int status);
// callback after conn to server
static void clientConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static SCliConn* clientConnCreate(SCliThrdObj* thrd);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientDestroy(uv_handle_t* handle);
// process data read from server, add decompress etc later
static void clientHandleResp(SCliConn* conn);
......@@ -176,14 +177,14 @@ static void clientHandleResp(SCliConn* conn) {
conn->data = NULL;
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
// handle conn except in conn pool
clientConnDestroy(pConn, true);
transUnrefCliHandle(pConn);
return;
}
SCliThrdObj* pThrd = pConn->hostThrd;
......@@ -209,7 +210,7 @@ static void clientHandleExcept(SCliConn* pConn) {
pConn->data = NULL;
tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
clientConnDestroy(pConn, true);
transUnrefCliHandle(pConn);
}
static void clientTimeoutCb(uv_timer_t* handle) {
......@@ -225,9 +226,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
// uv_stream_t stm = *(c->stream);
// uv_close((uv_handle_t*)&stm, clientDestroy);
clientConnDestroy(c, true);
transUnrefCliHandle(c);
} else {
break;
}
......@@ -238,7 +237,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
static void* creatConnPool(int size) {
static void* createConnPool(int size) {
// thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
}
......@@ -253,7 +252,7 @@ static void* destroyConnPool(void* pool) {
}
connList = taosHashIterate((SHashObj*)pool, connList);
}
taosHashClear(pool);
taosHashCleanup(pool);
}
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
......@@ -328,26 +327,38 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
}
if (nread < 0) {
tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
conn->broken = true;
clientHandleExcept(conn);
}
}
static SCliConn* clientConnCreate(SCliThrdObj* pThrd) {
SCliConn* conn = calloc(1, sizeof(SCliConn));
// read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn;
conn->writeReq.data = conn;
conn->connReq.data = conn;
QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd;
conn->broken = false;
transRefCliHandle(conn);
return conn;
}
static void clientConnDestroy(SCliConn* conn, bool clear) {
//
conn->ref--;
if (conn->ref == 0) {
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy);
}
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy);
}
}
static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data;
free(conn->stream);
free(conn->writeReq);
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn);
}
......@@ -359,7 +370,6 @@ static void clientWriteCb(uv_write_t* req, int status) {
tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) {
// handle
return;
}
destroyUserdata(&pMsg->msg);
......@@ -410,7 +420,7 @@ static void clientWrite(SCliConn* pConn) {
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
static void clientConnCb(uv_connect_t* req, int status) {
// impl later
......@@ -436,10 +446,10 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("client work thread %p start to quit", pThrd);
destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
// transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer);
uv_timer_stop(&pThrd->timer);
pThrd->quit = true;
// uv__async_stop(pThrd->cliAsync);
uv_stop(pThrd->loop);
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
......@@ -463,7 +473,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (conn != NULL) {
conn->data = pMsg;
conn->writeReq->data = conn;
conn->writeReq.data = conn;
transDestroyBuffer(&conn->readBuf);
if (pThrd->quit) {
......@@ -472,21 +482,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
clientWrite(conn);
} else {
conn = calloc(1, sizeof(SCliConn));
conn->ref++;
// read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn;
conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn;
QUEUE_INIT(&conn->conn);
conn->connReq.data = conn;
conn = clientConnCreate(pThrd);
conn->data = pMsg;
conn->hostThrd = pThrd;
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) {
......@@ -585,11 +582,10 @@ static SCliThrdObj* createThrdObj() {
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb);
pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer);
pThrd->timer->data = pThrd;
uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd;
pThrd->pool = creatConnPool(4);
pThrd->pool = createConnPool(4);
pThrd->quit = false;
return pThrd;
......@@ -602,8 +598,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
transDestroyAsyncPool(pThrd->asyncPool);
// free(pThrd->cliAsync);
free(pThrd->timer);
uv_timer_stop(&pThrd->timer);
free(pThrd->loop);
free(pThrd);
}
......@@ -649,6 +645,7 @@ void transUnrefCliHandle(void* handle) {
}
int ref = T_REF_DEC((SCliConn*)handle);
if (ref == 0) {
clientConnDestroy((SCliConn*)handle, true);
}
// unref cli handle
......
......@@ -226,9 +226,13 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
uvBuf->base = p->buf;
uvBuf->len = CAPACITY;
} else if (p->total == -1 && p->len < CAPACITY) {
uvBuf->base = p->buf + p->len;
uvBuf->len = CAPACITY - p->len;
} else {
p->cap = p->total;
p->buf = realloc(p->buf, p->cap);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len;
}
......
......@@ -19,11 +19,10 @@
typedef struct SSrvConn {
T_REF_DECLARE()
uv_tcp_t* pTcp;
uv_write_t* pWriter;
uv_timer_t* pTimer;
uv_tcp_t* pTcp;
uv_write_t pWriter;
uv_timer_t pTimer;
// uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
......@@ -65,7 +64,7 @@ typedef struct SWorkThrdObj {
queue conn;
pthread_mutex_t msgMtx;
void* pTransInst;
bool stop;
bool quit;
} SWorkThrdObj;
typedef struct SServerObj {
......@@ -236,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) {
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
// validate msg type
}
......@@ -312,6 +311,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} else {
tError("fail to dispatch conn to work thread");
}
free(req);
}
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
......@@ -349,8 +349,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
uv_timer_stop(&pConn->pTimer);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
......@@ -417,8 +417,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
uv_stop(pThrd->loop);
} else {
destroyAllConn(pThrd);
uv_loop_close(pThrd->loop);
pThrd->stop = true;
// uv_loop_close(pThrd->loop);
pThrd->quit = true;
}
} else {
uvStartSendResp(msg);
......@@ -493,9 +493,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pConn->pTimer);
pConn->pTimer->data = pConn;
uv_timer_init(pThrd->loop, &pConn->pTimer);
pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd;
......@@ -504,8 +503,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn;
pConn->pWriter.data = pConn;
transSetConnOption((uv_tcp_t*)pConn->pTcp);
......@@ -633,17 +631,20 @@ static void destroyConn(SSrvConn* conn, bool clear) {
}
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
SSrvConn* conn = handle->data;
if (conn == NULL) {
return;
}
SWorkThrdObj* thrd = conn->hostThrd;
tDebug("server conn %p destroy", conn);
uv_timer_stop(conn->pTimer);
uv_timer_stop(&conn->pTimer);
QUEUE_REMOVE(&conn->queue);
free(conn->pTcp);
free(conn->pWriter);
free(conn);
// free(conn);
if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) {
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop);
uv_stop(thrd->loop);
}
}
......@@ -680,7 +681,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
thrd->stop = false;
thrd->quit = false;
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册