提交 019f68f5 编写于 作者: Y youyong205

refactor the channel manager

上级 67d9aae3
......@@ -6,7 +6,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
......@@ -32,35 +31,22 @@ import com.dianping.cat.message.spi.MessageQueue;
import com.site.helper.Splitters;
public class ChannelManager implements Task {
private List<InetSocketAddress> m_serverAddresses;
private ClientConfigManager m_configManager;
private ClientBootstrap m_bootstrap;
private ChannelFuture m_activeFuture;
private Logger m_logger;
private ChannelFuture m_lastFuture;
private boolean m_active = true;
private int m_activeIndex = -1;
private int m_retriedTimes = 0;
private int m_count = 1;
private volatile int m_error = -1;
public static final int SIZE = 10000;
private AtomicInteger m_reconnects = new AtomicInteger(99);
private int m_count = -10;
private MessageQueue m_queue;
private String m_activeServerConfig;
private ChannelHolder m_activeChannelHolder;
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
ClientConfigManager configManager) {
......@@ -87,24 +73,35 @@ public class ChannelManager implements Task {
String serverConfig = loadServerConfig();
if (serverConfig != null) {
if (StringUtils.isNotEmpty(serverConfig)) {
List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
ChannelHolder holder = initChannel(configedAddresses, serverConfig);
initChannel(configedAddresses, serverConfig);
m_activeChannelHolder = holder;
} else {
initChannel(serverAddresses, null);
ChannelHolder holder = initChannel(serverAddresses, null);
m_activeChannelHolder = holder;
}
}
private void closeAllChannel() {
private void closeChannel(ChannelFuture channel) {
try {
if (m_activeFuture != null) {
m_activeFuture.getChannel().close();
if (channel != null) {
m_logger.info("close channel " + channel.getChannel().getRemoteAddress());
channel.getChannel().close();
}
} catch (Exception e) {
// ignore
}
if (m_lastFuture != null) {
m_lastFuture.getChannel().close();
}
m_activeIndex = -1;
private void closeChannelHolder(ChannelHolder channelHolder) {
try {
ChannelFuture channel = channelHolder.getActiveFuture();
closeChannel(channel);
channelHolder.setActiveIndex(-1);
} catch (Exception e) {
// ignore
}
......@@ -118,12 +115,8 @@ public class ChannelManager implements Task {
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
if (!future.isSuccess()) {
int count = m_reconnects.incrementAndGet();
if (count % 100 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
}
future.getChannel().close();
m_logger.error("Error when try to connecting to " + address);
closeChannel(future);
} else {
m_logger.info("Connected to CAT server at " + address);
return future;
......@@ -132,14 +125,14 @@ public class ChannelManager implements Task {
m_logger.error("Error when connect server " + address.getAddress(), e);
if (future != null) {
future.getChannel().close();
closeChannel(future);
}
}
return null;
}
public ChannelFuture getChannel() {
return m_activeFuture;
return m_activeChannelHolder.getActiveFuture();
}
@Override
......@@ -147,47 +140,63 @@ public class ChannelManager implements Task {
return "TcpSocketSender-ChannelManager";
}
private void initChannel(List<InetSocketAddress> addresses, String serverConfig) {
try {
private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {
StringBuilder sb = new StringBuilder();
for (InetSocketAddress address : addresses) {
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append(',');
}
m_logger.info("init CAT server:" + sb.toString());
m_serverAddresses = addresses;
try {
int len = addresses.size();
for (int i = 0; i < len; i++) {
ChannelFuture future = createChannel(addresses.get(i));
InetSocketAddress address = addresses.get(i);
String hostAddress = address.getAddress().getHostAddress();
ChannelHolder holder = null;
if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {
holder = new ChannelHolder();
holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);
} else {
ChannelFuture future = createChannel(address);
if (future != null) {
m_activeFuture = future;
m_activeIndex = i;
m_activeServerConfig = serverConfig;
break;
holder = new ChannelHolder();
holder.setActiveFuture(future).setConnectChanged(true);
}
}
if (holder != null) {
holder.setActiveIndex(i).setIp(hostAddress);
holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);
m_logger.info("success when init CAT server, new active holder" + holder.toString());
return holder;
}
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
// ignore
}
m_logger.info("Error when init CAT server " + sb.toString());
return null;
}
private boolean isChannelStalled() {
private boolean isChannelDisabled(ChannelFuture activeFuture) {
return activeFuture != null && !activeFuture.getChannel().isOpen();
}
private boolean isChannelStalled(ChannelFuture activeFuture) {
m_retriedTimes++;
int size = m_queue.size();
boolean stalled = m_activeFuture != null && size >= SIZE - 1;
boolean stalled = activeFuture != null && size >= TcpSocketSender.SIZE - 10;
if (stalled) {
if (m_retriedTimes >= 5) {
m_retriedTimes = 0;
m_logger.info("need to set active future to null. queue size:" + size + ",activeIndex:" + m_activeIndex);
return true;
} else {
m_logger.info("no need set active future to null due to retry time is not enough. queue size:" + size
+ ",retriedTimes:" + m_retriedTimes + ",activeIndex:" + m_activeIndex);
return false;
}
} else {
......@@ -203,7 +212,7 @@ public class ChannelManager implements Task {
return content.trim();
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
// ignore
}
return null;
}
......@@ -234,44 +243,48 @@ public class ChannelManager implements Task {
Pair<Boolean, String> pair = serverConfigChanged();
if (pair.getKey()) {
closeAllChannel();
String servers = pair.getValue();
List<InetSocketAddress> serverAddresses = parseSocketAddress(servers);
ChannelHolder newHolder = initChannel(serverAddresses, servers);
initChannel(serverAddresses, servers);
}
}
if (newHolder != null) {
if (newHolder.isConnectChanged()) {
ChannelHolder last = m_activeChannelHolder;
try {
if (isChannelStalled()) {
m_activeFuture.getChannel().close();
m_activeFuture = null;
m_activeIndex = -1;
m_activeChannelHolder = newHolder;
closeChannelHolder(last);
m_logger.info("switch active channel to " + m_activeChannelHolder);
} else {
m_activeChannelHolder = newHolder;
}
if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) {
m_activeFuture.getChannel().close();
m_activeFuture = null;
m_activeIndex = m_serverAddresses.size();
}
if (m_activeIndex == -1) {
m_activeIndex = m_serverAddresses.size();
}
if (m_lastFuture != null && m_lastFuture != m_activeFuture) {
m_lastFuture.getChannel().close();
m_lastFuture = null;
}
ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();
try {
if (isChannelStalled(activeFuture) || isChannelDisabled(activeFuture)) {
closeChannelHolder(m_activeChannelHolder);
}
} catch (Throwable e) {
m_logger.error(e.getMessage(), e);
}
try {
for (int i = 0; i < m_activeIndex; i++) {
ChannelFuture future = createChannel(m_serverAddresses.get(i));
int reconnectServers = m_activeChannelHolder.getActiveIndex();
if (reconnectServers == -1) {
reconnectServers = serverAddresses.size();
}
for (int i = 0; i < reconnectServers; i++) {
ChannelFuture future = createChannel(serverAddresses.get(i));
if (future != null) {
m_lastFuture = m_activeFuture;
m_activeFuture = future;
m_activeIndex = i;
ChannelFuture lastFuture = activeFuture;
m_activeChannelHolder.setActiveFuture(future);
m_activeChannelHolder.setActiveIndex(i);
closeChannel(lastFuture);
break;
}
}
......@@ -280,7 +293,7 @@ public class ChannelManager implements Task {
}
try {
Thread.sleep(2 * 1000L); // check every 2 seconds
Thread.sleep(10 * 1000L); // check every 2 seconds
} catch (InterruptedException e) {
// ignore
}
......@@ -290,7 +303,7 @@ public class ChannelManager implements Task {
private Pair<Boolean, String> serverConfigChanged() {
String current = loadServerConfig();
if (!StringUtils.isEmpty(current) && !current.equals(m_activeServerConfig)) {
if (!StringUtils.isEmpty(current) && !current.equals(m_activeChannelHolder.getActiveServerConfig())) {
return new Pair<Boolean, String>(true, current);
} else {
return new Pair<Boolean, String>(false, current);
......@@ -300,7 +313,7 @@ public class ChannelManager implements Task {
private boolean shouldCheckServerConfig(int count) {
int duration = 60 * 5;
if (count % (duration) == 0 || m_activeIndex == -1) {
if (count % duration == 0 || m_activeChannelHolder.getActiveIndex() == -1) {
return true;
} else {
return false;
......@@ -312,23 +325,96 @@ public class ChannelManager implements Task {
m_active = false;
}
public static class ChannelHolder {
private ChannelFuture m_activeFuture;
private int m_activeIndex = -1;
private String m_activeServerConfig;
private List<InetSocketAddress> m_serverAddresses;
private String m_ip;
private boolean m_connectChanged;
public ChannelFuture getActiveFuture() {
return m_activeFuture;
}
public int getActiveIndex() {
return m_activeIndex;
}
public String getActiveServerConfig() {
return m_activeServerConfig;
}
public String getIp() {
return m_ip;
}
public List<InetSocketAddress> getServerAddresses() {
return m_serverAddresses;
}
public boolean isConnectChanged() {
return m_connectChanged;
}
public ChannelHolder setActiveFuture(ChannelFuture activeFuture) {
m_activeFuture = activeFuture;
return this;
}
public ChannelHolder setActiveIndex(int activeIndex) {
m_activeIndex = activeIndex;
return this;
}
public ChannelHolder setActiveServerConfig(String activeServerConfig) {
m_activeServerConfig = activeServerConfig;
return this;
}
public ChannelHolder setConnectChanged(boolean connectChanged) {
m_connectChanged = connectChanged;
return this;
}
public ChannelHolder setIp(String ip) {
m_ip = ip;
return this;
}
public ChannelHolder setServerAddresses(List<InetSocketAddress> serverAddresses) {
m_serverAddresses = serverAddresses;
return this;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("active future :").append(m_activeFuture.getChannel().getRemoteAddress());
sb.append(" index:").append(m_activeIndex);
sb.append(" ip:").append(m_ip);
sb.append(" server config:").append(m_activeServerConfig);
return sb.toString();
}
}
private class ExceptionHandler extends SimpleChannelHandler {
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_error++;
if (m_error % 1000 == 0) {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
m_error++;
if (m_error % 1000 == 0) {
m_logger.warn("Channel disconnected due to " + e.getCause());
}
}
}
}
\ No newline at end of file
......@@ -6,6 +6,12 @@ import java.util.List;
import javax.servlet.ServletException;
import org.unidal.lookup.annotation.Inject;
import org.unidal.web.mvc.PageHandler;
import org.unidal.web.mvc.annotation.InboundActionMeta;
import org.unidal.web.mvc.annotation.OutboundActionMeta;
import org.unidal.web.mvc.annotation.PayloadMeta;
import com.dianping.cat.Constants;
import com.dianping.cat.helper.TimeUtil;
import com.dianping.cat.home.router.entity.Domain;
......@@ -15,12 +21,6 @@ import com.dianping.cat.report.service.ReportServiceManager;
import com.dianping.cat.system.SystemPage;
import com.dianping.cat.system.config.RouterConfigManager;
import org.unidal.lookup.annotation.Inject;
import org.unidal.web.mvc.PageHandler;
import org.unidal.web.mvc.annotation.InboundActionMeta;
import org.unidal.web.mvc.annotation.OutboundActionMeta;
import org.unidal.web.mvc.annotation.PayloadMeta;
public class Handler implements PageHandler<Context> {
@Inject
private JspViewer m_jspViewer;
......@@ -50,6 +50,9 @@ public class Handler implements PageHandler<Context> {
switch (action) {
case API:
Domain domainConfig = m_configManager.getRouterConfig().findDomain(payload.getDomain());
if (domainConfig == null) {
if (report != null) {
Domain domain = report.findDomain(payload.getDomain());
String str = null;
......@@ -71,6 +74,9 @@ public class Handler implements PageHandler<Context> {
model.setContent(buildServerStr(servers));
}
} else {
model.setContent(buildServerStr(domainConfig.getServers()));
}
break;
case MODEL:
if (report != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册