提交 49dc8232 编写于 作者: wu-sheng's avatar wu-sheng

1.优化检查机制,移除NEED_ADD_SENDER_FLAG,此标记位在多线程间协同,有可能造成数据不一致。

上级 92f28c00
...@@ -29,7 +29,6 @@ public class DataSenderFactoryWithBalance { ...@@ -29,7 +29,6 @@ public class DataSenderFactoryWithBalance {
private static List<DataSender> usingDataSender = new ArrayList<DataSender>(); private static List<DataSender> usingDataSender = new ArrayList<DataSender>();
private static int maxKeepConnectingSenderSize; private static int maxKeepConnectingSenderSize;
private static boolean NEED_ADD_SENDER_FLAG = false;
private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) { private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) {
if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) { if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) {
...@@ -122,33 +121,30 @@ public class DataSenderFactoryWithBalance { ...@@ -122,33 +121,30 @@ public class DataSenderFactoryWithBalance {
while (true) { while (true) {
// 检查是否需要新增 // 检查是否需要新增
// NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值 // NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值
if (NEED_ADD_SENDER_FLAG) { DataSender newSender;
DataSender newSender; for (int i = 0; i < usingDataSender.size(); i++) {
for (int i = 0; i < usingDataSender.size(); i++) { if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) {
if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) { usingDataSender.get(i).close();
usingDataSender.get(i).close(); // 正在使用的Sender的数量 <= maxKeepConnectingSenderSize
// 正在使用的Sender的数量 <= maxKeepConnectingSenderSize // 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量
// 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量 // 可替换的服务器数量 = 剩余服务器地址数量
// 可替换的服务器数量 = 剩余服务器地址数量 // 当剩余服务器地址数量 <= 0
// 当剩余服务器地址数量 <= 0 // 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思
// 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思 // 当剩余服务器地址数量 > 0 时,
// 当剩余服务器地址数量 > 0 时, // 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环
// 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环 while ((newSender = findReadySender()) == null) {
while ((newSender = findReadySender()) == null) { try {
try { Thread.sleep(RETRY_FIND_CONNECTION_SENDER);
Thread.sleep(RETRY_FIND_CONNECTION_SENDER); } catch (InterruptedException e) {
} catch (InterruptedException e) { logger.log(Level.ALL, "Sleep failed.");
logger.log(Level.ALL, "Sleep failed.");
}
}
usingDataSender.set(i, newSender);
unusedServerAddresses.add(usingDataSender.get(i)
.getServerIp());
if (usingDataSender.size() >= maxKeepConnectingSenderSize) {
NEED_ADD_SENDER_FLAG = false;
break;
} }
} }
usingDataSender.set(i, newSender);
unusedServerAddresses.add(usingDataSender.get(i)
.getServerIp());
if (usingDataSender.size() >= maxKeepConnectingSenderSize) {
break;
}
} }
} }
...@@ -211,6 +207,5 @@ public class DataSenderFactoryWithBalance { ...@@ -211,6 +207,5 @@ public class DataSenderFactoryWithBalance {
usingDataSender.get(index) usingDataSender.get(index)
.setStatus(DataSender.SenderStatus.FAILED); .setStatus(DataSender.SenderStatus.FAILED);
} }
NEED_ADD_SENDER_FLAG = true;
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册