提交 85d814ed 编写于 作者: Y youyong205

fix the tcpsocket

上级 52c2b75c
package com.dianping.cat.message.io;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -119,8 +117,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
}
}
}
m_manager.releaseAll();
}
@Override
......@@ -169,23 +165,21 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
m_manager.shutdown();
}
static class ChannelManager implements Task {
private static class ChannelManager implements Task {
private List<InetSocketAddress> m_serverAddresses;
private List<ChannelFuture> m_futures;
private ClientBootstrap m_bootstrap;
private ChannelFuture m_activeFuture;
private int m_activeIndex;
private Logger m_logger;
private ChannelFuture m_lastFuture;
private boolean m_active = true;
private int m_activeIndex = -1;
private AtomicInteger m_reconnects = new AtomicInteger(999);
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses) {
......@@ -193,7 +187,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
m_logger = logger;
m_serverAddresses = serverAddresses;
m_futures = new ArrayList<ChannelFuture>(Collections.<ChannelFuture> nCopies(len, null));
ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Boss", 10);
ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10);
......@@ -223,24 +216,28 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
}
}
ChannelFuture createChannel(int index) {
private ChannelFuture createChannel(int index) {
InetSocketAddress address = m_serverAddresses.get(index);
ChannelFuture future = m_bootstrap.connect(address);
try {
ChannelFuture future = m_bootstrap.connect(address);
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
if (!future.isSuccess()) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
int count = m_reconnects.incrementAndGet();
if (!future.isSuccess()) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
if (count % 100 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
}
} else {
m_logger.info("Connected to CAT server at " + address);
return future;
}
return null;
} else {
m_logger.info("Connected to CAT server at " + address);
return future;
} catch (Throwable e) {
m_logger.error("Error when connect server " + address.getAddress(), e);
}
return null;
}
public ChannelFuture getChannel() {
......@@ -257,17 +254,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
return "TcpSocketHierarchySender-ChannelManager";
}
public void releaseAll() {
for (ChannelFuture future : m_futures) {
if (future != null) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
}
}
m_bootstrap.getFactory().releaseExternalResources();
m_futures = null;
}
@Override
public void run() {
try {
......@@ -276,7 +262,10 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) {
m_activeIndex = m_serverAddresses.size();
}
if (m_activeIndex == -1) {
m_activeIndex = m_serverAddresses.size();
}
for (int i = 0; i < m_activeIndex; i++) {
ChannelFuture future = createChannel(i);
......@@ -304,7 +293,7 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
}
}
static class ExceptionHandler extends SimpleChannelHandler {
private static class ExceptionHandler extends SimpleChannelHandler {
private Logger m_logger;
public ExceptionHandler(Logger logger) {
......@@ -314,10 +303,12 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
e.getChannel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
e.getChannel().close();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册