提交 9f180b75 编写于 作者: Y youyong205

sort the code

上级 50848437
......@@ -61,87 +61,6 @@ public class ChannelManager implements Task {
private String m_lastServers;
private List<InetSocketAddress> parse(String content) {
try {
List<String> strs = Splitters.by(";").noEmptyItem().split(content);
List<InetSocketAddress> address = new ArrayList<InetSocketAddress>();
for (String str : strs) {
List<String> items = Splitters.by(":").noEmptyItem().split(str);
address.add(new InetSocketAddress(items.get(0), Integer.parseInt(items.get(1))));
}
return address;
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
}
return new ArrayList<InetSocketAddress>();
}
private String getServerConfig() {
try {
String url = m_configManager.getServerConfigUrl();
InputStream currentServer = Urls.forIO().readTimeout(3000).connectTimeout(1000).openStream(url);
String content = Files.forIO().readFrom(currentServer, "utf-8");
return content.trim();
} catch (Exception e) {
}
return null;
}
private Pair<Boolean, String> serverConfigChanged() {
String current = getServerConfig();
if (current != null && !current.equals(m_lastServers)) {
return new Pair<Boolean, String>(true, current);
} else {
return new Pair<Boolean, String>(false, current);
}
}
private void closeAllChannel() {
try {
if (m_activeFuture != null) {
m_activeFuture.getChannel().close();
}
if (m_lastFuture != null) {
m_lastFuture.getChannel().close();
}
m_activeIndex = -1;
} catch (Exception e) {
// ignore
}
}
private void initChannel(List<InetSocketAddress> addresses) {
try {
StringBuilder sb = new StringBuilder();
for (InetSocketAddress address : addresses) {
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append(',');
}
m_logger.info("init CAT server:" + sb.toString());
m_serverAddresses = addresses;
int len = addresses.size();
for (int i = 0; i < len; i++) {
ChannelFuture future = createChannel(addresses.get(i));
if (future != null) {
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
} catch (Exception e) {
e.printStackTrace();
// ignore
}
}
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
ClientConfigManager configManager) {
m_logger = logger;
......@@ -164,7 +83,7 @@ public class ChannelManager implements Task {
bootstrap.setOption("keepAlive", true);
m_bootstrap = bootstrap;
String serverConfig = getServerConfig();
if (serverConfig != null) {
......@@ -178,6 +97,20 @@ public class ChannelManager implements Task {
}
}
private void closeAllChannel() {
try {
if (m_activeFuture != null) {
m_activeFuture.getChannel().close();
}
if (m_lastFuture != null) {
m_lastFuture.getChannel().close();
}
m_activeIndex = -1;
} catch (Exception e) {
// ignore
}
}
private ChannelFuture createChannel(InetSocketAddress address) {
ChannelFuture future = null;
......@@ -215,6 +148,46 @@ public class ChannelManager implements Task {
return "TcpSocketSender-ChannelManager";
}
private String getServerConfig() {
try {
String url = m_configManager.getServerConfigUrl();
InputStream currentServer = Urls.forIO().readTimeout(3000).connectTimeout(1000).openStream(url);
String content = Files.forIO().readFrom(currentServer, "utf-8");
return content.trim();
} catch (Exception e) {
}
return null;
}
private void initChannel(List<InetSocketAddress> addresses) {
try {
StringBuilder sb = new StringBuilder();
for (InetSocketAddress address : addresses) {
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append(',');
}
m_logger.info("init CAT server:" + sb.toString());
m_serverAddresses = addresses;
int len = addresses.size();
for (int i = 0; i < len; i++) {
ChannelFuture future = createChannel(addresses.get(i));
if (future != null) {
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
} catch (Exception e) {
e.printStackTrace();
// ignore
}
}
private boolean isChannelStalled() {
m_retriedTimes++;
int size = m_queue.size();
......@@ -235,15 +208,21 @@ public class ChannelManager implements Task {
}
}
private boolean shouldCheckServerConfig(int count) {
//return true;
int duration = 3600;
if (count % (duration) == 0) {
return true;
} else {
return false;
}
private List<InetSocketAddress> parse(String content) {
try {
List<String> strs = Splitters.by(";").noEmptyItem().split(content);
List<InetSocketAddress> address = new ArrayList<InetSocketAddress>();
for (String str : strs) {
List<String> items = Splitters.by(":").noEmptyItem().split(str);
address.add(new InetSocketAddress(items.get(0), Integer.parseInt(items.get(1))));
}
return address;
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
}
return new ArrayList<InetSocketAddress>();
}
@Override
......@@ -308,6 +287,26 @@ public class ChannelManager implements Task {
}
}
private Pair<Boolean, String> serverConfigChanged() {
String current = getServerConfig();
if (current != null && !current.equals(m_lastServers)) {
return new Pair<Boolean, String>(true, current);
} else {
return new Pair<Boolean, String>(false, current);
}
}
private boolean shouldCheckServerConfig(int count) {
int duration = 3600;
if (count % (duration) == 0) {
return true;
} else {
return false;
}
}
@Override
public void shutdown() {
m_active = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册