diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/buffer/BufferGroup.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/buffer/BufferGroup.java index a0c715e033378a3610deae7e5566210f010cfea4..4ed5cd364506f0fc28bfffc48c12fa282b3f6c95 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/buffer/BufferGroup.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/buffer/BufferGroup.java @@ -1,21 +1,22 @@ package com.ai.cloud.skywalking.buffer; +import static com.ai.cloud.skywalking.conf.Config.Buffer.BUFFER_MAX_SIZE; +import static com.ai.cloud.skywalking.conf.Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL; +import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_CONSUMER; +import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_WAIT_TIME; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + import com.ai.cloud.skywalking.conf.Config; import com.ai.cloud.skywalking.conf.Constants; import com.ai.cloud.skywalking.protocol.Span; import com.ai.cloud.skywalking.selfexamination.HealthCollector; import com.ai.cloud.skywalking.selfexamination.HeathReading; -import com.ai.cloud.skywalking.sender.DataSenderFactory; import com.ai.cloud.skywalking.sender.DataSenderFactoryWithBalance; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static com.ai.cloud.skywalking.conf.Config.Buffer.BUFFER_MAX_SIZE; -import static com.ai.cloud.skywalking.conf.Config.Consumer.*; - public class BufferGroup { private static Logger logger = Logger.getLogger(BufferGroup.class.getName()); private String groupName; @@ -67,7 +68,7 @@ public class BufferGroup { } bool = true; if (data.length() + dataBuffer[i].toString().length() >= Config.Sender.MAX_SEND_LENGTH) { - while (!DataSenderFactory.getSender().send(data.toString())) { + while (!DataSenderFactoryWithBalance.getSender().send(data.toString())) { try { Thread.sleep(CONSUMER_FAIL_RETRY_WAIT_INTERVAL); } catch (InterruptedException e) { diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java index 7f4b26abbea6c6ea603d0a414437fa0a851f1aca..63f2f55ba18ddb39c0c02c43d6bb51c09849df22 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java @@ -9,8 +9,12 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.util.logging.Level; +import java.util.logging.Logger; public class DataSender { + private static Logger logger = Logger.getLogger(DataSender.class.getName()); + private SocketChannel socketChannel; private Selector selector; private InetSocketAddress socketAddress; @@ -63,14 +67,18 @@ public class DataSender { return this.socketAddress; } - public void closeConnect() throws IOException { + public void close(){ if (socketChannel != null) { - socketChannel.close(); + try { + socketChannel.close(); + } catch (IOException e) { + logger.log(Level.ALL, "close connection Failed"); + } } } public enum SenderStatus { - READY, FAILED, SWITCHING + READY, FAILED } public SenderStatus getStatus() { diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactory.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactory.java deleted file mode 100644 index 39b80f215aa54b1719b93aad3e6014316530fe44..0000000000000000000000000000000000000000 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactory.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.ai.cloud.skywalking.sender; - -import com.ai.cloud.skywalking.conf.Config; -import com.ai.cloud.skywalking.selfexamination.HealthCollector; -import com.ai.cloud.skywalking.selfexamination.HeathReading; -import com.ai.cloud.skywalking.util.StringUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static com.ai.cloud.skywalking.conf.Config.Sender.CONNECT_PERCENT; -import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_GET_SENDER_WAIT_INTERVAL; -import static com.ai.cloud.skywalking.conf.Config.SenderChecker.CHECK_POLLING_TIME; - -public class DataSenderFactory { - - private static Logger logger = Logger.getLogger(DataSenderFactory.class.getName()); - - private static List socketAddresses = new ArrayList(); - private static List unUsedSocketAddresses = new ArrayList(); - private static List availableSenders = new ArrayList(); - private static Object lock = new Object(); - - static { - try { - if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) { - throw new IllegalArgumentException("Collection service configuration error."); - } - //过滤重复地址 - Set tmpSocktAddress = new HashSet(); - for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) { - String[] server = serverConfig.split(":"); - if (server.length != 2) - throw new IllegalArgumentException("Collection service configuration error."); - tmpSocktAddress.add(new InetSocketAddress(server[0], Integer.valueOf(server[1]))); - } - - socketAddresses.addAll(tmpSocktAddress); - - } catch (Exception e) { - logger.log(Level.ALL, "Collection service configuration error.", e); - System.exit(-1); - } - - new DataSenderChecker().start(); - } - - public static DataSender getSender() { - while (availableSenders.size() == 0) { - try { - Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep failure", e); - } - } - return availableSenders.get(ThreadLocalRandom.current().nextInt(0, availableSenders.size())); - } - - static class DataSenderChecker extends Thread { - - private int availableSize; - - public DataSenderChecker() { - super("DataSenderChecker"); - - if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) { - logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100"); - System.exit(-1); - } - availableSize = (int) Math.ceil(socketAddresses.size() * ((1.0 * CONNECT_PERCENT / 100) % 100)); - // 初始化DataSender - List usedSocketAddress = new ArrayList(); - - int index; - while (availableSenders.size() < availableSize) { - // 随机获取服务器地址 - index = ThreadLocalRandom.current().nextInt(socketAddresses.size()); - try { - availableSenders.add(new DataSender(socketAddresses.get(index))); - usedSocketAddress.add(socketAddresses.get(index)); - } catch (IOException e) { - unUsedSocketAddresses.add(socketAddresses.get(index)); - } - } - unUsedSocketAddresses = new ArrayList(socketAddresses); - unUsedSocketAddresses.removeAll(usedSocketAddress); - } - - public void run() { - Iterator unUsedSocketAddressIterator; - InetSocketAddress tmpSocketAddress; - while (true) { - unUsedSocketAddressIterator = unUsedSocketAddresses.iterator(); - while (unUsedSocketAddressIterator.hasNext()) { - - tmpSocketAddress = unUsedSocketAddressIterator.next(); - if (availableSenders.size() >= availableSize) { - HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.INFO, "the num of available senders is enough."); - break; - } - - synchronized (lock) { - try { - HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.INFO, "increasing available senders."); - availableSenders.add(new DataSender(tmpSocketAddress)); - unUsedSocketAddresses.remove(tmpSocketAddress); - } catch (IOException e) { - - } - } - } - - if (availableSenders.size() >= availableSize) { - HealthCollector.getCurrentHeathReading(null).updateData(HeathReading.WARNING, "the num of available senders is not enough (" + availableSenders.size() + ")."); - } - - try { - Thread.sleep(CHECK_POLLING_TIME); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep Failure"); - } - } - } - } - - public static void unRegister(DataSender sender) { - synchronized (lock) { - availableSenders.remove(sender); - unUsedSocketAddresses.add(sender.getServerIp()); - } - } - -} diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactoryWithBalance.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactoryWithBalance.java index dea1708eecb5302c00b61bf2a8e9062cf4afff51..5979908f39eb17f940028c74c2e9433bb0bef878 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactoryWithBalance.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSenderFactoryWithBalance.java @@ -1,7 +1,11 @@ package com.ai.cloud.skywalking.sender; -import com.ai.cloud.skywalking.conf.Config; -import com.ai.cloud.skywalking.util.StringUtil; +import static com.ai.cloud.skywalking.conf.Config.Sender.CHECKER_THREAD_WAIT_INTERVAL; +import static com.ai.cloud.skywalking.conf.Config.Sender.CLOSE_SENDER_COUNTDOWN; +import static com.ai.cloud.skywalking.conf.Config.Sender.CONNECT_PERCENT; +import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_FIND_CONNECTION_SENDER; +import static com.ai.cloud.skywalking.conf.Config.Sender.RETRY_GET_SENDER_WAIT_INTERVAL; +import static com.ai.cloud.skywalking.conf.Config.Sender.SWITCH_SENDER_INTERVAL; import java.io.IOException; import java.net.InetSocketAddress; @@ -13,195 +17,200 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Level; import java.util.logging.Logger; -import static com.ai.cloud.skywalking.conf.Config.Sender.*; +import com.ai.cloud.skywalking.conf.Config; +import com.ai.cloud.skywalking.util.StringUtil; public class DataSenderFactoryWithBalance { - private static Logger logger = Logger.getLogger(DataSenderFactoryWithBalance.class.getName()); - // unUsedServerAddress存放没有使用的服务器地址, - private static List unusedServerAddresses = new ArrayList(); - - private static List usingDataSender = new ArrayList(); - private static int maxKeepConnectingSenderSize; - private static Object lock = new Object(); - private static boolean NEED_ADD_SENDER_FLAG = false; - - private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) { - if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) { - logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100"); - System.exit(-1); - } - return (int) Math.ceil(allAddressSize * ((1.0 * CONNECT_PERCENT / 100) % 100)); - } - - // 初始化服务端的地址数据 - static { - // 获取数据 - if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) { - throw new IllegalArgumentException("Collection service configuration error."); - } - - // 初始化地址 - Set tmpInetSocketAddress = new HashSet(); - for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) { - String[] server = serverConfig.split(":"); - if (server.length != 2) - throw new IllegalArgumentException("Collection service configuration error."); - tmpInetSocketAddress.add(new InetSocketAddress(server[0], Integer.valueOf(server[1]))); - } - - unusedServerAddresses.addAll(tmpInetSocketAddress); - - //根据配置的服务器集群的地址,来计算保持连接的Sender的数量 - maxKeepConnectingSenderSize = calculateMaxKeeperConnectingSenderSize(tmpInetSocketAddress.size()); - - - // 初始化的发送程序 - int index = 0; - while (usingDataSender.size() < maxKeepConnectingSenderSize) { - index = ThreadLocalRandom.current().nextInt(0, unusedServerAddresses.size()); - try { - usingDataSender.add(new DataSender(unusedServerAddresses.get(index))); - unusedServerAddresses.remove(index); - } catch (IOException e) { - // 服务器连接不上 - logger.log(Level.SEVERE, "Failed to connect server[" + - unusedServerAddresses.get(index).getHostName() + "]"); - continue; - } - } - - new DataSenderChecker().start(); - } - - // 获取连接 - - public static DataSender getSender() { - DataSender readySender = null; - while (true) { - int index = ThreadLocalRandom.current().nextInt(0, usingDataSender.size()); - if (usingDataSender.get(index).getStatus() == DataSender.SenderStatus.READY) { - readySender = usingDataSender.get(index); - break; - } - - if (readySender == null) { - try { - Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep failed"); - } - } - - } - - return readySender; - } - - // 定时Sender状态检查 - public static class DataSenderChecker extends Thread { - public DataSenderChecker() { - super("Data-Sender-Checker"); - } - - @Override - public void run() { - long sleepTime = 0; - while (true) { - // 检查是否需要新增 - // NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值 - if (NEED_ADD_SENDER_FLAG) { - DataSender newSender; - for (int i = 0; i < usingDataSender.size(); i++) { - if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) { - // 正在使用的Sender的数量 <= maxKeepConnectingSenderSize - // 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量 - // 可替换的服务器数量 = 剩余服务器地址数量 - // 当剩余服务器地址数量 <= 0 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思 - // 当剩余服务器地址数量 > 0 时, 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环 - while ((newSender = findReadySender()) == null) { - try { - Thread.sleep(RETRY_FIND_CONNECTION_SENDER); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep failed."); - } - } - usingDataSender.set(i, newSender); - unusedServerAddresses.add(usingDataSender.get(i).getServerIp()); - if (usingDataSender.size() >= maxKeepConnectingSenderSize) { - NEED_ADD_SENDER_FLAG = false; - break; - } - } - } - - - } - - // 检查是否需要替换 - if (sleepTime >= SWITCH_SENDER_INTERVAL) { - DataSender toBeSwitchSender; - DataSender tmpSender; - while (true) { - int toBeSwitchIndex = ThreadLocalRandom.current().nextInt(0, usingDataSender.size() - 1); - toBeSwitchSender = usingDataSender.get(toBeSwitchIndex); - if (toBeSwitchSender.getStatus() == DataSender.SenderStatus.READY) { - tmpSender = findReadySender(); - if (tmpSender != null) { - usingDataSender.set(toBeSwitchIndex, tmpSender); - try { - Thread.sleep(CLOSE_SENDER_COUNTDOWN); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep Failed"); - } - unusedServerAddresses.remove(tmpSender.getServerIp()); - unusedServerAddresses.add(toBeSwitchSender.getServerIp()); - } - break; - } - } - sleepTime = 0; - } - - // - sleepTime += CHECKER_THREAD_WAIT_INTERVAL; - try { - Thread.sleep(CHECKER_THREAD_WAIT_INTERVAL); - } catch (InterruptedException e) { - logger.log(Level.ALL, "Sleep failed"); - } - - } - } - - } - - private static DataSender findReadySender() { - DataSender result = null; - for (InetSocketAddress serverAddress : unusedServerAddresses) { - try { - result = new DataSender(serverAddress); - break; - } catch (IOException e) { - if (result != null) { - try { - result.closeConnect(); - } catch (IOException ex) { - logger.log(Level.ALL, "Failed to close socket[" + - serverAddress.getHostName() + "]"); - } - } - continue; - } - } - return result; - } - - public static void unRegister(DataSender socket) { - int index = usingDataSender.indexOf(socket); - if (index != -1) { - usingDataSender.get(index).setStatus(DataSender.SenderStatus.FAILED); - } - NEED_ADD_SENDER_FLAG = true; - } + private static Logger logger = Logger + .getLogger(DataSenderFactoryWithBalance.class.getName()); + // unUsedServerAddress存放没有使用的服务器地址, + private static List unusedServerAddresses = new ArrayList(); + + private static List usingDataSender = new ArrayList(); + private static int maxKeepConnectingSenderSize; + private static boolean NEED_ADD_SENDER_FLAG = false; + + private static int calculateMaxKeeperConnectingSenderSize(int allAddressSize) { + if (CONNECT_PERCENT <= 0 || CONNECT_PERCENT > 100) { + logger.log(Level.ALL, "CONNECT_PERCENT must between 1 and 100"); + System.exit(-1); + } + return (int) Math.ceil(allAddressSize + * ((1.0 * CONNECT_PERCENT / 100) % 100)); + } + + // 初始化服务端的地址数据 + static { + // 获取数据 + if (StringUtil.isEmpty(Config.Sender.SERVERS_ADDR)) { + throw new IllegalArgumentException( + "Collection service configuration error."); + } + + // 初始化地址 + Set tmpInetSocketAddress = new HashSet(); + for (String serverConfig : Config.Sender.SERVERS_ADDR.split(";")) { + String[] server = serverConfig.split(":"); + if (server.length != 2) + throw new IllegalArgumentException( + "Collection service configuration error."); + tmpInetSocketAddress.add(new InetSocketAddress(server[0], Integer + .valueOf(server[1]))); + } + + unusedServerAddresses.addAll(tmpInetSocketAddress); + + // 根据配置的服务器集群的地址,来计算保持连接的Sender的数量 + maxKeepConnectingSenderSize = calculateMaxKeeperConnectingSenderSize(tmpInetSocketAddress + .size()); + + // 初始化的发送程序 + int index = 0; + while (usingDataSender.size() < maxKeepConnectingSenderSize) { + index = ThreadLocalRandom.current().nextInt(0, + unusedServerAddresses.size()); + try { + usingDataSender.add(new DataSender(unusedServerAddresses + .get(index))); + unusedServerAddresses.remove(index); + } catch (IOException e) { + // 服务器连接不上 + logger.log(Level.SEVERE, "Failed to connect server[" + + unusedServerAddresses.get(index).getHostName() + "]"); + continue; + } + } + + new DataSenderChecker().start(); + } + + // 获取连接 + + public static DataSender getSender() { + DataSender readySender = null; + while (true) { + int index = ThreadLocalRandom.current().nextInt(0, + usingDataSender.size()); + if (usingDataSender.get(index).getStatus() == DataSender.SenderStatus.READY) { + readySender = usingDataSender.get(index); + break; + } + + if (readySender == null) { + try { + Thread.sleep(RETRY_GET_SENDER_WAIT_INTERVAL); + } catch (InterruptedException e) { + logger.log(Level.ALL, "Sleep failed"); + } + } + + } + + return readySender; + } + + // 定时Sender状态检查 + public static class DataSenderChecker extends Thread { + public DataSenderChecker() { + super("Data-Sender-Checker"); + } + + @Override + public void run() { + long sleepTime = 0; + while (true) { + // 检查是否需要新增 + // NEED_ADD_SENDER_FLAG 将会在unRegister方法修改值 + if (NEED_ADD_SENDER_FLAG) { + DataSender newSender; + for (int i = 0; i < usingDataSender.size(); i++) { + if (usingDataSender.get(i).getStatus() == DataSender.SenderStatus.FAILED) { + usingDataSender.get(i).close(); + // 正在使用的Sender的数量 <= maxKeepConnectingSenderSize + // 剩余的服务器地址数量 = 总得服务器地址数量 - 正在使用的Sender的数量 + // 可替换的服务器数量 = 剩余服务器地址数量 + // 当剩余服务器地址数量 <= 0 + // 时,可以替换的地址也不存在,替换操作就可以不执行,所以这里的while是这样的意思 + // 当剩余服务器地址数量 > 0 时, + // 就可以找到可以替换的地址,替换操作也就可以执行了,这里的就会跳出while循环 + while ((newSender = findReadySender()) == null) { + try { + Thread.sleep(RETRY_FIND_CONNECTION_SENDER); + } catch (InterruptedException e) { + logger.log(Level.ALL, "Sleep failed."); + } + } + usingDataSender.set(i, newSender); + unusedServerAddresses.add(usingDataSender.get(i) + .getServerIp()); + if (usingDataSender.size() >= maxKeepConnectingSenderSize) { + NEED_ADD_SENDER_FLAG = false; + break; + } + } + } + + } + + // 检查是否需要替换 + if (sleepTime >= SWITCH_SENDER_INTERVAL) { + DataSender toBeSwitchSender; + DataSender tmpSender; + int toBeSwitchIndex = ThreadLocalRandom.current().nextInt( + 0, usingDataSender.size() - 1); + toBeSwitchSender = usingDataSender.get(toBeSwitchIndex); + tmpSender = findReadySender(); + if (tmpSender != null) { + usingDataSender.set(toBeSwitchIndex, tmpSender); + try { + Thread.sleep(CLOSE_SENDER_COUNTDOWN); + } catch (InterruptedException e) { + logger.log(Level.ALL, "Sleep Failed"); + } + toBeSwitchSender.close(); + unusedServerAddresses.remove(tmpSender.getServerIp()); + unusedServerAddresses.add(toBeSwitchSender + .getServerIp()); + } + sleepTime = 0; + } + + // + sleepTime += CHECKER_THREAD_WAIT_INTERVAL; + try { + Thread.sleep(CHECKER_THREAD_WAIT_INTERVAL); + } catch (InterruptedException e) { + logger.log(Level.ALL, "Sleep failed"); + } + + } + } + + } + + private static DataSender findReadySender() { + DataSender result = null; + for (InetSocketAddress serverAddress : unusedServerAddresses) { + try { + result = new DataSender(serverAddress); + break; + } catch (IOException e) { + if (result != null) { + result.close(); + } + continue; + } + } + return result; + } + + public static void unRegister(DataSender socket) { + int index = usingDataSender.indexOf(socket); + if (index != -1) { + usingDataSender.get(index) + .setStatus(DataSender.SenderStatus.FAILED); + } + NEED_ADD_SENDER_FLAG = true; + } } diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/util/TraceIdGenerator.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/util/TraceIdGenerator.java index f122516a893e6026d9799290249ddacf1a0343ea..61ea2fe51d94300c7d38cd650aaa6a6ffd48bbb1 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/util/TraceIdGenerator.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/util/TraceIdGenerator.java @@ -1,7 +1,5 @@ package com.ai.cloud.skywalking.util; -import com.ai.cloud.skywalking.conf.Config; - import java.util.UUID; public final class TraceIdGenerator {