提交 9bdd2a8e 编写于 作者: F Frankie Wu

auto reconnect

上级 2c952ef9
......@@ -3,6 +3,8 @@ package com.dianping.cat.message.io;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
......@@ -11,6 +13,7 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
......@@ -21,7 +24,7 @@ import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class TcpSocketSender implements MessageSender {
public class TcpSocketSender implements MessageSender, LogEnabled {
@Inject
private String m_host;
......@@ -35,13 +38,25 @@ public class TcpSocketSender implements MessageSender {
private ChannelFuture m_future;
private ClientBootstrap m_bootstrap;
private int m_reconnectPeriod = 5000; // every 5 seconds
private long m_lastReconnectTime;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() {
if (m_host == null) {
throw new RuntimeException("No host was configured for TcpSocketSender!");
}
InetSocketAddress address = new InetSocketAddress(m_host, m_port);
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
......@@ -55,24 +70,56 @@ public class TcpSocketSender implements MessageSender {
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
InetSocketAddress address = new InetSocketAddress(m_host, m_port);
ChannelFuture future = bootstrap.connect(address);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
m_logger.error("Error when connecting to " + address, future.getCause());
} else {
m_factory = factory;
m_future = future;
m_logger.info("Connected to CAT server at " + address);
}
m_bootstrap = bootstrap;
}
public void reconnect() {
long now = System.currentTimeMillis();
if (m_lastReconnectTime > 0 && m_lastReconnectTime + m_reconnectPeriod > now) {
return;
}
m_lastReconnectTime = now;
InetSocketAddress address = new InetSocketAddress(m_host, m_port);
ChannelFuture future = m_bootstrap.connect(address);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
m_logger.error("Error when reconnecting to " + address, future.getCause());
} else {
m_future = future;
m_logger.info("Reconnected to CAT server at " + address);
}
}
@Override
public void send(MessageTree tree) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(20 * 1024); // 20K
if (m_future == null || !m_future.getChannel().isOpen()) {
reconnect();
}
m_codec.encode(tree, buf);
m_future.getChannel().write(buf);
if (m_future != null && m_future.getChannel().isOpen()) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(20 * 1024); // 20K
m_codec.encode(tree, buf);
m_future.getChannel().write(buf);
}
}
public void setCodec(MessageCodec codec) {
......@@ -87,6 +134,10 @@ public class TcpSocketSender implements MessageSender {
m_port = port;
}
public void setReconnectPeriod(int reconnectPeriod) {
m_reconnectPeriod = reconnectPeriod;
}
@Override
public void shutdown() {
m_future.getChannel().getCloseFuture().awaitUninterruptibly();
......@@ -95,9 +146,12 @@ public class TcpSocketSender implements MessageSender {
class MyHandler extends SimpleChannelHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getChannel().close();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册