提交 e7a72d5a 编写于 作者: dengyihao's avatar dengyihao

fix: no resp when host machine shutdown

上级 b1192d50
...@@ -671,7 +671,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -671,7 +671,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
transSetConnOption((uv_tcp_t*)conn->stream); // transSetConnOption((uv_tcp_t*)conn->stream);
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
...@@ -1061,9 +1061,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1061,9 +1061,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
STraceId* trace = &pMsg->msg.info.traceId;
if (!EPSET_IS_VALID(&pCtx->epSet)) { if (!EPSET_IS_VALID(&pCtx->epSet)) {
tError("invalid epset"); tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
} }
...@@ -1121,10 +1122,29 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1121,10 +1122,29 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port); addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
STraceId* trace = &(pMsg->msg.info.traceId);
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
cliHandleExcept(conn);
errno = 0;
return;
}
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret != 0) {
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn);
return;
}
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
...@@ -1139,7 +1159,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1139,7 +1159,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
} }
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s conn %p ready", pTransInst->label, conn); tGTrace("%s conn %p ready", pTransInst->label, conn);
} }
static void cliAsyncCb(uv_async_t* handle) { static void cliAsyncCb(uv_async_t* handle) {
......
...@@ -205,6 +205,10 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -205,6 +205,10 @@ bool transReadComplete(SConnBuffer* connBuf) {
} }
int transSetConnOption(uv_tcp_t* stream) { int transSetConnOption(uv_tcp_t* stream) {
#if defined(WINDOWS) || defined(DARWIN)
#else
uv_tcp_keepalive(stream, 1, 20);
#endif
return uv_tcp_nodelay(stream, 1); return uv_tcp_nodelay(stream, 1);
// int ret = uv_tcp_keepalive(stream, 5, 60); // int ret = uv_tcp_keepalive(stream, 5, 60);
} }
......
...@@ -55,7 +55,7 @@ typedef struct TdSocket { ...@@ -55,7 +55,7 @@ typedef struct TdSocket {
#endif #endif
int refId; int refId;
SocketFd fd; SocketFd fd;
} * TdSocketPtr, TdSocket; } *TdSocketPtr, TdSocket;
typedef struct TdSocketServer { typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK #if SOCKET_WITH_LOCK
...@@ -63,7 +63,7 @@ typedef struct TdSocketServer { ...@@ -63,7 +63,7 @@ typedef struct TdSocketServer {
#endif #endif
int refId; int refId;
SocketFd fd; SocketFd fd;
} * TdSocketServerPtr, TdSocketServer; } *TdSocketServerPtr, TdSocketServer;
typedef struct TdEpoll { typedef struct TdEpoll {
#if SOCKET_WITH_LOCK #if SOCKET_WITH_LOCK
...@@ -71,7 +71,7 @@ typedef struct TdEpoll { ...@@ -71,7 +71,7 @@ typedef struct TdEpoll {
#endif #endif
int refId; int refId;
EpollFd fd; EpollFd fd;
} * TdEpollPtr, TdEpoll; } *TdEpollPtr, TdEpoll;
#if 0 #if 0
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr, int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
...@@ -1005,7 +1005,7 @@ int32_t taosGetFqdn(char *fqdn) { ...@@ -1005,7 +1005,7 @@ int32_t taosGetFqdn(char *fqdn) {
// immediately // immediately
// hints.ai_family = AF_INET; // hints.ai_family = AF_INET;
strcpy(fqdn, hostname); strcpy(fqdn, hostname);
strcpy(fqdn+strlen(hostname), ".local"); strcpy(fqdn + strlen(hostname), ".local");
#else // __APPLE__ #else // __APPLE__
struct addrinfo hints = {0}; struct addrinfo hints = {0};
struct addrinfo *result = NULL; struct addrinfo *result = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册