提交 1c519bca 编写于 作者: dengyihao's avatar dengyihao

redefine timer

上级 5c5d70e0
...@@ -96,7 +96,7 @@ typedef void* queue[2]; ...@@ -96,7 +96,7 @@ typedef void* queue[2];
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3000 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512
......
...@@ -25,7 +25,9 @@ typedef struct SCliConn { ...@@ -25,7 +25,9 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
queue wreqQueue; queue wreqQueue;
uv_timer_t* timer;
uv_timer_t* timer; // read timer, forbidden
uv_timer_t connTimer;
void* hostThrd; void* hostThrd;
...@@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); ...@@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
// register timer for read // register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle); static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
...@@ -462,6 +466,12 @@ void cliHandleExcept(SCliConn* conn) { ...@@ -462,6 +466,12 @@ void cliHandleExcept(SCliConn* conn) {
cliHandleExceptImpl(conn, -1); cliHandleExceptImpl(conn, -1);
} }
void cliConnTimeout(uv_timer_t* handle) {
SCliConn* conn = handle->data;
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_timer_stop(handle);
cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
void cliReadTimeoutCb(uv_timer_t* handle) { void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb // set up timeout cb
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
...@@ -638,8 +648,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -638,8 +648,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
conn->connReq.data = conn; uv_timer_init(pThrd->loop, &conn->connTimer);
conn->connTimer.data = conn;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
...@@ -819,6 +831,8 @@ _RETURN: ...@@ -819,6 +831,8 @@ _RETURN:
void cliConnCb(uv_connect_t* req, int status) { void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
uv_timer_stop(&pConn->connTimer);
if (status != 0) { if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
...@@ -997,31 +1011,21 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -997,31 +1011,21 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) {
tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
}
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
if (fd == -1) {
tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
cliHandleExcept(conn);
return;
}
uv_tcp_open((uv_tcp_t*)conn->stream, fd);
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port); addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
uv_timer_stop(&conn->connTimer);
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
} }
uv_timer_start(&conn->connTimer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
} }
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s conn %p ready", pTransInst->label, conn); tGTrace("%s conn %p ready", pTransInst->label, conn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册