提交 92f28c00 编写于 作者: wu-sheng's avatar wu-sheng

1.移除无用的DataSenderFactory,使用新的DataSenderFactoryWithBalance

2.DataSenderFactoryWithBalance增加close操作,简化连接替换,重新balance操作
3.移除SenderStatus无用状态
上级 2f923b26
package com.ai.cloud.skywalking.buffer; package com.ai.cloud.skywalking.buffer;
import static com.ai.cloud.skywalking.conf.Config.Buffer.BUFFER_MAX_SIZE;
import static com.ai.cloud.skywalking.conf.Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL;
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_CONSUMER;
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_WAIT_TIME;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.ai.cloud.skywalking.conf.Config; import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.conf.Constants; import com.ai.cloud.skywalking.conf.Constants;
import com.ai.cloud.skywalking.protocol.Span; import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.selfexamination.HealthCollector; import com.ai.cloud.skywalking.selfexamination.HealthCollector;
import com.ai.cloud.skywalking.selfexamination.HeathReading; import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.sender.DataSenderFactory;
import com.ai.cloud.skywalking.sender.DataSenderFactoryWithBalance; import com.ai.cloud.skywalking.sender.DataSenderFactoryWithBalance;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import static com.ai.cloud.skywalking.conf.Config.Buffer.BUFFER_MAX_SIZE;
import static com.ai.cloud.skywalking.conf.Config.Consumer.*;
public class BufferGroup { public class BufferGroup {
private static Logger logger = Logger.getLogger(BufferGroup.class.getName()); private static Logger logger = Logger.getLogger(BufferGroup.class.getName());
private String groupName; private String groupName;
...@@ -67,7 +68,7 @@ public class BufferGroup { ...@@ -67,7 +68,7 @@ public class BufferGroup {
} }
bool = true; bool = true;
if (data.length() + dataBuffer[i].toString().length() >= Config.Sender.MAX_SEND_LENGTH) { if (data.length() + dataBuffer[i].toString().length() >= Config.Sender.MAX_SEND_LENGTH) {
while (!DataSenderFactory.getSender().send(data.toString())) { while (!DataSenderFactoryWithBalance.getSender().send(data.toString())) {
try { try {
Thread.sleep(CONSUMER_FAIL_RETRY_WAIT_INTERVAL); Thread.sleep(CONSUMER_FAIL_RETRY_WAIT_INTERVAL);
} catch (InterruptedException e) { } catch (InterruptedException e) {
......
...@@ -9,8 +9,12 @@ import java.nio.ByteBuffer; ...@@ -9,8 +9,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DataSender { public class DataSender {
private static Logger logger = Logger.getLogger(DataSender.class.getName());
private SocketChannel socketChannel; private SocketChannel socketChannel;
private Selector selector; private Selector selector;
private InetSocketAddress socketAddress; private InetSocketAddress socketAddress;
...@@ -63,14 +67,18 @@ public class DataSender { ...@@ -63,14 +67,18 @@ public class DataSender {
return this.socketAddress; return this.socketAddress;
} }
public void closeConnect() throws IOException { public void close(){
if (socketChannel != null) { if (socketChannel != null) {
socketChannel.close(); try {
socketChannel.close();
} catch (IOException e) {
logger.log(Level.ALL, "close connection Failed");
}
} }
} }
public enum SenderStatus { public enum SenderStatus {
READY, FAILED, SWITCHING READY, FAILED
} }
public SenderStatus getStatus() { public SenderStatus getStatus() {
......
package com.ai.cloud.skywalking.sender;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.selfexamination.HealthCollector;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.util.StringUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Level;
import java.util.logging.Logger;
import static com.ai.cloud.skywalking.conf.Config.Sender.CONNECT_PERCENT;
import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_GET_SENDER_WAIT_INTERVAL;
import static com.ai.cloud.skywalking.conf.Config.SenderChecker.CHECK_POLLING_TIME;
public class DataSenderFactory {
private static Logger logger = Logger.getLogger(DataSenderFactory.class.getName());
private static List<InetSocketAddress> socketAddresses = new ArrayList<InetSocketAddress>();
private static List<InetSocketAddress> unUsedSocketAddresses = new ArrayList<InetSocketAddress>();
private static List<DataSender> availableSenders = new ArrayList<DataSender>();
private static Object lock = new Object();
static {
try {
if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) {
throw new IllegalArgumentException("Collection service configuration error.");
}
//过滤重复地址
Set<InetSocketAddress> tmpSocktAddress = new HashSet<InetSocketAddress>();
for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) {
String[] server = serverConfig.split(":");
if (server.length != 2)
throw new IllegalArgumentException("Collection service configuration error.");
tmpSocktAddress.add(new InetSocketAddress(server[0], Integer.valueOf(server[1])));
}
socketAddresses.addAll(tmpSocktAddress);
} catch (Exception e) {
logger.log(Level.ALL, "Collection service configuration error.", e);
System.exit(-1);
}
new DataSenderChecker().start();
}
public static DataSender getSender() {
while (availableSenders.size() == 0) {
try {
Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL);
} catch (InterruptedException e) {
logger.log(Level.ALL, "Sleep failure", e);
}
}
return availableSenders.get(ThreadLocalRandom.current().nextInt(0, availableSenders.size()));
}
static class DataSenderChecker extends Thread {
private int availableSize;
public DataSenderChecker() {
super("DataSenderChecker");
if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) {
logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100");
System.exit(-1);
}
availableSize = (int) Math.ceil(socketAddresses.size() * ((1.0 * CONNECT_PERCENT / 100) % 100));
// 初始化DataSender
List<SocketAddress> usedSocketAddress = new ArrayList<SocketAddress>();
int index;
while (availableSenders.size() < availableSize) {
// 随机获取服务器地址
index = ThreadLocalRandom.current().nextInt(socketAddresses.size());
try {
availableSenders.add(new DataSender(socketAddresses.get(index)));
usedSocketAddress.add(socketAddresses.get(index));
} catch (IOException e) {
unUsedSocketAddresses.add(socketAddresses.get(index));
}
}
unUsedSocketAddresses = new ArrayList<InetSocketAddress>(socketAddresses);
unUsedSocketAddresses.removeAll(usedSocketAddress);
}
public void run() {
Iterator<InetSocketAddress> unUsedSocketAddressIterator;
InetSocketAddress tmpSocketAddress;
while (true) {
unUsedSocketAddressIterator = unUsedSocketAddresses.iterator();
while (unUsedSocketAddressIterator.hasNext()) {
tmpSocketAddress = unUsedSocketAddressIterator.next();
if (availableSenders.size() >= availableSize) {
HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.INFO, "the num of available senders is enough.");
break;
}
synchronized (lock) {
try {
HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.INFO, "increasing available senders.");
availableSenders.add(new DataSender(tmpSocketAddress));
unUsedSocketAddresses.remove(tmpSocketAddress);
} catch (IOException e) {
}
}
}
if (availableSenders.size() >= availableSize) {
HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.WARNING, "the num of available senders is not enough (" + availableSenders.size() + ").");
}
try {
Thread.sleep(CHECK_POLLING_TIME);
} catch (InterruptedException e) {
logger.log(Level.ALL, "Sleep Failure");
}
}
}
}
public static void unRegister(DataSender sender) {
synchronized (lock) {
availableSenders.remove(sender);
unUsedSocketAddresses.add(sender.getServerIp());
}
}
}
package com.ai.cloud.skywalking.sender; package com.ai.cloud.skywalking.sender;
import com.ai.cloud.skywalking.conf.Config; import static com.ai.cloud.skywalking.conf.Config.Sender.CHECKER_THREAD_WAIT_INTERVAL;
import com.ai.cloud.skywalking.util.StringUtil; import static com.ai.cloud.skywalking.conf.Config.Sender.CLOSE_SENDER_COUNTDOWN;
import static com.ai.cloud.skywalking.conf.Config.Sender.CONNECT_PERCENT;
import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_FIND_CONNECTION_SENDER;
import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_GET_SENDER_WAIT_INTERVAL;
import static com.ai.cloud.skywalking.conf.Config.Sender.SWITCH_SENDER_INTERVAL;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -13,195 +17,200 @@ import java.util.concurrent.ThreadLocalRandom; ...@@ -13,195 +17,200 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import static com.ai.cloud.skywalking.conf.Config.Sender.*; import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.util.StringUtil;
public class DataSenderFactoryWithBalance { public class DataSenderFactoryWithBalance {
private static Logger logger = Logger.getLogger(DataSenderFactoryWithBalance.class.getName()); private static Logger logger = Logger
// unUsedServerAddress存放没有使用的服务器地址, .getLogger(DataSenderFactoryWithBalance.class.getName());
private static List<InetSocketAddress> unusedServerAddresses = new ArrayList<InetSocketAddress>(); // unUsedServerAddress存放没有使用的服务器地址,
private static List<InetSocketAddress> unusedServerAddresses = new ArrayList<InetSocketAddress>();
private static List<DataSender> usingDataSender = new ArrayList<DataSender>();
private static int maxKeepConnectingSenderSize; private static List<DataSender> usingDataSender = new ArrayList<DataSender>();
private static Object lock = new Object(); private static int maxKeepConnectingSenderSize;
private static boolean NEED_ADD_SENDER_FLAG = false; private static boolean NEED_ADD_SENDER_FLAG = false;
private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) { private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) {
if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) { if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) {
logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100"); logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100");
System.exit(-1); System.exit(-1);
} }
return (int) Math.ceil(allAddressSize * ((1.0 * CONNECT_PERCENT / 100) % 100)); return (int) Math.ceil(allAddressSize
} * ((1.0 * CONNECT_PERCENT / 100) % 100));
}
// 初始化服务端的地址数据
static { // 初始化服务端的地址数据
// 获取数据 static {
if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) { // 获取数据
throw new IllegalArgumentException("Collection service configuration error."); if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) {
} throw new IllegalArgumentException(
"Collection service configuration error.");
// 初始化地址 }
Set<InetSocketAddress> tmpInetSocketAddress = new HashSet<InetSocketAddress>();
for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) { // 初始化地址
String[] server = serverConfig.split(":"); Set<InetSocketAddress> tmpInetSocketAddress = new HashSet<InetSocketAddress>();
if (server.length != 2) for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) {
throw new IllegalArgumentException("Collection service configuration error."); String[] server = serverConfig.split(":");
tmpInetSocketAddress.add(new InetSocketAddress(server[0], Integer.valueOf(server[1]))); if (server.length != 2)
} throw new IllegalArgumentException(
"Collection service configuration error.");
unusedServerAddresses.addAll(tmpInetSocketAddress); tmpInetSocketAddress.add(new InetSocketAddress(server[0], Integer
.valueOf(server[1])));
//根据配置的服务器集群的地址,来计算保持连接的Sender的数量 }
maxKeepConnectingSenderSize = calculateMaxKeeperConnectingSenderSize(tmpInetSocketAddress.size());
unusedServerAddresses.addAll(tmpInetSocketAddress);
// 初始化的发送程序 // 根据配置的服务器集群的地址,来计算保持连接的Sender的数量
int index = 0; maxKeepConnectingSenderSize = calculateMaxKeeperConnectingSenderSize(tmpInetSocketAddress
while (usingDataSender.size() < maxKeepConnectingSenderSize) { .size());
index = ThreadLocalRandom.current().nextInt(0, unusedServerAddresses.size());
try { // 初始化的发送程序
usingDataSender.add(new DataSender(unusedServerAddresses.get(index))); int index = 0;
unusedServerAddresses.remove(index); while (usingDataSender.size() < maxKeepConnectingSenderSize) {
} catch (IOException e) { index = ThreadLocalRandom.current().nextInt(0,
// 服务器连接不上 unusedServerAddresses.size());
logger.log(Level.SEVERE, "Failed to connect server[" + try {
unusedServerAddresses.get(index).getHostName() + "]"); usingDataSender.add(new DataSender(unusedServerAddresses
continue; .get(index)));
} unusedServerAddresses.remove(index);
} } catch (IOException e) {
// 服务器连接不上
new DataSenderChecker().start(); logger.log(Level.SEVERE, "Failed to connect server["
} + unusedServerAddresses.get(index).getHostName() + "]");
continue;
// 获取连接 }
}
public static DataSender getSender() {
DataSender readySender = null; new DataSenderChecker().start();
while (true) { }
int index = ThreadLocalRandom.current().nextInt(0, usingDataSender.size());
if (usingDataSender.get(index).getStatus() == DataSender.SenderStatus.READY) { // 获取连接
readySender = usingDataSender.get(index);
break; public static DataSender getSender() {
} DataSender readySender = null;
while (true) {
if (readySender == null) { int index = ThreadLocalRandom.current().nextInt(0,
try { usingDataSender.size());
Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL); if (usingDataSender.get(index).getStatus() == DataSender.SenderStatus.READY) {
} catch (InterruptedException e) { readySender = usingDataSender.get(index);
logger.log(Level.ALL, "Sleep failed"); break;
} }
}
if (readySender == null) {
} try {
Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL);
return readySender; } catch (InterruptedException e) {
} logger.log(Level.ALL, "Sleep failed");
}
// 定时Sender状态检查 }
public static class DataSenderChecker extends Thread {
public DataSenderChecker() { }
super("Data-Sender-Checker");
} return readySender;
}
@Override
public void run() { // 定时Sender状态检查
long sleepTime = 0; public static class DataSenderChecker extends Thread {
while (true) { public DataSenderChecker() {
// 检查是否需要新增 super("Data-Sender-Checker");
// NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值 }
if (NEED_ADD_SENDER_FLAG) {
DataSender newSender; @Override
for (int i = 0; i < usingDataSender.size(); i++) { public void run() {
if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) { long sleepTime = 0;
// 正在使用的Sender的数量 <= maxKeepConnectingSenderSize while (true) {
// 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量 // 检查是否需要新增
// 可替换的服务器数量 = 剩余服务器地址数量 // NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值
// 当剩余服务器地址数量 <= 0 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思 if (NEED_ADD_SENDER_FLAG) {
// 当剩余服务器地址数量 > 0 时, 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环 DataSender newSender;
while ((newSender = findReadySender()) == null) { for (int i = 0; i < usingDataSender.size(); i++) {
try { if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) {
Thread.sleep(RETRY_FIND_CONNECTION_SENDER); usingDataSender.get(i).close();
} catch (InterruptedException e) { // 正在使用的Sender的数量 <= maxKeepConnectingSenderSize
logger.log(Level.ALL, "Sleep failed."); // 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量
} // 可替换的服务器数量 = 剩余服务器地址数量
} // 当剩余服务器地址数量 <= 0
usingDataSender.set(i, newSender); // 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思
unusedServerAddresses.add(usingDataSender.get(i).getServerIp()); // 当剩余服务器地址数量 > 0 时,
if (usingDataSender.size() >= maxKeepConnectingSenderSize) { // 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环
NEED_ADD_SENDER_FLAG = false; while ((newSender = findReadySender()) == null) {
break; try {
} Thread.sleep(RETRY_FIND_CONNECTION_SENDER);
} } catch (InterruptedException e) {
} logger.log(Level.ALL, "Sleep failed.");
}
}
} usingDataSender.set(i, newSender);
unusedServerAddresses.add(usingDataSender.get(i)
// 检查是否需要替换 .getServerIp());
if (sleepTime >= SWITCH_SENDER_INTERVAL) { if (usingDataSender.size() >= maxKeepConnectingSenderSize) {
DataSender toBeSwitchSender; NEED_ADD_SENDER_FLAG = false;
DataSender tmpSender; break;
while (true) { }
int toBeSwitchIndex = ThreadLocalRandom.current().nextInt(0, usingDataSender.size() - 1); }
toBeSwitchSender = usingDataSender.get(toBeSwitchIndex); }
if (toBeSwitchSender.getStatus() == DataSender.SenderStatus.READY) {
tmpSender = findReadySender(); }
if (tmpSender != null) {
usingDataSender.set(toBeSwitchIndex, tmpSender); // 检查是否需要替换
try { if (sleepTime >= SWITCH_SENDER_INTERVAL) {
Thread.sleep(CLOSE_SENDER_COUNTDOWN); DataSender toBeSwitchSender;
} catch (InterruptedException e) { DataSender tmpSender;
logger.log(Level.ALL, "Sleep Failed"); int toBeSwitchIndex = ThreadLocalRandom.current().nextInt(
} 0, usingDataSender.size() - 1);
unusedServerAddresses.remove(tmpSender.getServerIp()); toBeSwitchSender = usingDataSender.get(toBeSwitchIndex);
unusedServerAddresses.add(toBeSwitchSender.getServerIp()); tmpSender = findReadySender();
} if (tmpSender != null) {
break; usingDataSender.set(toBeSwitchIndex, tmpSender);
} try {
} Thread.sleep(CLOSE_SENDER_COUNTDOWN);
sleepTime = 0; } catch (InterruptedException e) {
} logger.log(Level.ALL, "Sleep Failed");
}
// toBeSwitchSender.close();
sleepTime += CHECKER_THREAD_WAIT_INTERVAL; unusedServerAddresses.remove(tmpSender.getServerIp());
try { unusedServerAddresses.add(toBeSwitchSender
Thread.sleep(CHECKER_THREAD_WAIT_INTERVAL); .getServerIp());
} catch (InterruptedException e) { }
logger.log(Level.ALL, "Sleep failed"); sleepTime = 0;
} }
} //
} sleepTime += CHECKER_THREAD_WAIT_INTERVAL;
try {
} Thread.sleep(CHECKER_THREAD_WAIT_INTERVAL);
} catch (InterruptedException e) {
private static DataSender findReadySender() { logger.log(Level.ALL, "Sleep failed");
DataSender result = null; }
for (InetSocketAddress serverAddress : unusedServerAddresses) {
try { }
result = new DataSender(serverAddress); }
break;
} catch (IOException e) { }
if (result != null) {
try { private static DataSender findReadySender() {
result.closeConnect(); DataSender result = null;
} catch (IOException ex) { for (InetSocketAddress serverAddress : unusedServerAddresses) {
logger.log(Level.ALL, "Failed to close socket[" + try {
serverAddress.getHostName() + "]"); result = new DataSender(serverAddress);
} break;
} } catch (IOException e) {
continue; if (result != null) {
} result.close();
} }
return result; continue;
} }
}
public static void unRegister(DataSender socket) { return result;
int index = usingDataSender.indexOf(socket); }
if (index != -1) {
usingDataSender.get(index).setStatus(DataSender.SenderStatus.FAILED); public static void unRegister(DataSender socket) {
} int index = usingDataSender.indexOf(socket);
NEED_ADD_SENDER_FLAG = true; if (index != -1) {
} usingDataSender.get(index)
.setStatus(DataSender.SenderStatus.FAILED);
}
NEED_ADD_SENDER_FLAG = true;
}
} }
package com.ai.cloud.skywalking.util; package com.ai.cloud.skywalking.util;
import com.ai.cloud.skywalking.conf.Config;
import java.util.UUID; import java.util.UUID;
public final class TraceIdGenerator { public final class TraceIdGenerator {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册