提交 aa37940d 编写于 作者: Z zhangxin10

解决DataSenderFactory中的DataSenderChecker对于按百分比选择连接,处理不正确

上级 33e38fa7
...@@ -21,8 +21,8 @@ public class DataSenderFactory { ...@@ -21,8 +21,8 @@ public class DataSenderFactory {
private static Logger logger = Logger.getLogger(DataSenderFactory.class.getName()); private static Logger logger = Logger.getLogger(DataSenderFactory.class.getName());
private static Set<SocketAddress> socketAddresses = new HashSet<SocketAddress>(); private static List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
private static Set<SocketAddress> unUsedSocketAddresses = new HashSet<SocketAddress>(); private static List<SocketAddress> unUsedSocketAddresses = new ArrayList<SocketAddress>();
private static List<DataSender> availableSenders = new ArrayList<DataSender>(); private static List<DataSender> availableSenders = new ArrayList<DataSender>();
private static Object lock = new Object(); private static Object lock = new Object();
...@@ -31,13 +31,17 @@ public class DataSenderFactory { ...@@ -31,13 +31,17 @@ public class DataSenderFactory {
if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) { if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) {
throw new IllegalArgumentException("Collection service configuration error."); throw new IllegalArgumentException("Collection service configuration error.");
} }
//过滤重复地址
Set<SocketAddress> tmpSocktAddress = new HashSet<SocketAddress>();
for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) { for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) {
String[] server = serverConfig.split(":"); String[] server = serverConfig.split(":");
if (server.length != 2) if (server.length != 2)
throw new IllegalArgumentException("Collection service configuration error."); throw new IllegalArgumentException("Collection service configuration error.");
socketAddresses.add(new InetSocketAddress(server[0], Integer.valueOf(server[1]))); tmpSocktAddress.add(new InetSocketAddress(server[0], Integer.valueOf(server[1])));
} }
socketAddresses.addAll(tmpSocktAddress);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.ALL, "Collection service configuration error.", e); logger.log(Level.ALL, "Collection service configuration error.", e);
System.exit(-1); System.exit(-1);
...@@ -51,7 +55,7 @@ public class DataSenderFactory { ...@@ -51,7 +55,7 @@ public class DataSenderFactory {
try { try {
Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL); Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.log(Level.ALL, "Sleep failure", e); logger.log(Level.ALL, "Sleep failure", e);
} }
} }
return availableSenders.get(ThreadLocalRandom.current().nextInt(0, availableSenders.size())); return availableSenders.get(ThreadLocalRandom.current().nextInt(0, availableSenders.size()));
...@@ -72,18 +76,18 @@ public class DataSenderFactory { ...@@ -72,18 +76,18 @@ public class DataSenderFactory {
// 初始化DataSender // 初始化DataSender
List<SocketAddress> usedSocketAddress = new ArrayList<SocketAddress>(); List<SocketAddress> usedSocketAddress = new ArrayList<SocketAddress>();
for (SocketAddress socketAddress : socketAddresses) { int index;
if (availableSenders.size() >= availableSize) { while (availableSenders.size() < availableSize) {
break; // 随机获取服务器地址
} index = ThreadLocalRandom.current().nextInt(socketAddresses.size());
try { try {
availableSenders.add(new DataSender(socketAddress)); availableSenders.add(new DataSender(socketAddresses.get(index)));
usedSocketAddress.add(socketAddress); usedSocketAddress.add(socketAddresses.get(index));
} catch (IOException e) { } catch (IOException e) {
unUsedSocketAddresses.add(socketAddress); unUsedSocketAddresses.add(socketAddresses.get(index));
} }
} }
unUsedSocketAddresses = new HashSet<SocketAddress>(socketAddresses); unUsedSocketAddresses = new ArrayList<SocketAddress>(socketAddresses);
unUsedSocketAddresses.removeAll(usedSocketAddress); unUsedSocketAddresses.removeAll(usedSocketAddress);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册