diff --git a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java index faa4d636aa409d9687e60bdb0f777f0e8a6b0928..a65d61b114d7c8325cf402a93b09ad3b4c9c0135 100644 --- a/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java +++ b/skywalking-api/src/main/java/com/ai/cloud/skywalking/sender/DataSender.java @@ -1,5 +1,6 @@ package com.ai.cloud.skywalking.sender; +import com.ai.cloud.skywalking.util.ProtocolPackager; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -75,14 +76,10 @@ public class DataSender implements IDataSender { public boolean send(String data) { try { if (channel != null && channel.isActive()) { - // 对协议格式进行修改 - // | check sum(4 byte) | data - byte[] dataArray = new byte[data.getBytes().length + 4]; - System.arraycopy(dataArray,0, dataArray,4, dataArray.length); - byte[] checkSumArray = generateChecksum(data); - System.arraycopy(checkSumArray,0, dataArray,0, checkSumArray.length); - channel.writeAndFlush(dataArray); + byte[] dataPackage = ProtocolPackager.pack(data.getBytes()); + channel.writeAndFlush(dataPackage); + SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.INFO, "DataSender[" + socketAddress + "] send data successfully."); return true; }else{ @@ -97,30 +94,6 @@ public class DataSender implements IDataSender { return false; } - /** - * 生成校验和参数 - * @param data - * @return - */ - private byte[] generateChecksum(String data) { - char[] dataArray = data.toCharArray(); - int result = dataArray[0]; - for (int i = 0; i < dataArray.length; i++) { - result ^= dataArray[i]; - } - - return intToBytes(result); - } - - private byte[] intToBytes(int value) { - byte[] src = new byte[4]; - src[0] = (byte) ((value >> 24) & 0xFF); - src[1] = (byte) ((value >> 16) & 0xFF); - src[2] = (byte) ((value >> 8) & 0xFF); - src[3] = (byte) (value & 0xFF); - return src; - } - public InetSocketAddress getServerAddr() { return this.socketAddress; } diff --git a/skywalking-api/src/test/java/test/ai/cloud/checksum/CheckSumTest.java b/skywalking-api/src/test/java/test/ai/cloud/checksum/CheckSumTest.java index 01bf07e25e07977e9a5723a245a1c99002aa18d4..03a166584dddb5281d402f2cf0d958a30e45960d 100644 --- a/skywalking-api/src/test/java/test/ai/cloud/checksum/CheckSumTest.java +++ b/skywalking-api/src/test/java/test/ai/cloud/checksum/CheckSumTest.java @@ -4,7 +4,7 @@ import org.junit.Test; public class CheckSumTest { private static final int dataIndex = 2; - private static final int MAX_TEST_COUNT = 100000; + private static final int MAX_TEST_COUNT = 100_000_00; @Test public void TestAllXORSum() { diff --git a/skywalking-server/pom.xml b/skywalking-server/pom.xml index 90b56db1b90a6240498f1fefb18d131428322aa6..12b996229506ab61219c0e4f10a9ac7b36a584f6 100644 --- a/skywalking-server/pom.xml +++ b/skywalking-server/pom.xml @@ -4,7 +4,7 @@ com.ai.cloud skywalking-server - 1.0b + 1.0-Final jar skywalking-server diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/conf/Config.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/conf/Config.java index 98e7c6683e6ae839843875524e254fc4603e1586..2ac53e3315ed7193694be5cf93d1655cb25ca7b3 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/conf/Config.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/conf/Config.java @@ -10,10 +10,10 @@ public class Config { public static int MAX_DEAL_DATA_THREAD_NUMBER = 3; // 异常数据的时间间隔 - public static int EXCEPTION_DATA_SENDING_INTERVAL = 5 * 60; + public static int FAILED_PACKAGE_WATCHING_TIME_WINDOW = 5 * 60; // 时间间隔内最大异常数据次数 - public static int MAX_SEND_EXCEPTION_DATA_COUNT = 200; + public static int MAX_WATCHING_FAILED_PACKAGE_SIZE = 200; } // 数据缓存配置类 diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/handler/CollectionServerDataHandler.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/handler/CollectionServerDataHandler.java index 6775f92e4bbb22a7d77efe01f3e54eb7c84c0538..41520da398be2d99ad7dbc6d0b2b2aae335ba347 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/handler/CollectionServerDataHandler.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/handler/CollectionServerDataHandler.java @@ -3,12 +3,12 @@ package com.ai.cloud.skywalking.reciever.handler; import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer; import com.ai.cloud.skywalking.reciever.conf.Config; import com.ai.cloud.skywalking.reciever.storage.AlarmRedisConnector; +import com.ai.cloud.skywalking.util.ProtocolPackager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import redis.clients.jedis.Jedis; import java.net.InetSocketAddress; -import java.util.Arrays; public class CollectionServerDataHandler extends SimpleChannelInboundHandler { @@ -17,55 +17,30 @@ public class CollectionServerDataHandler extends SimpleChannelInboundHandler= 0 && msg.length < Config.DataPackage.MAX_DATA_PACKAGE) { - // | check sum(4 byte) | data | - byte[] originCheckSum = new byte[4]; - System.arraycopy(msg, 0, originCheckSum, 0, 4); - // 对协议进行拆包 - String data = new String(msg, 4, msg.length - 4); - // 计算校验和 - byte[] checkSum = generateChecksum(data); - if (Arrays.equals(originCheckSum, checkSum)) { - DataBufferThreadContainer.getDataBufferThread().saveTemporarily(data.getBytes()); - } else { - InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().localAddress(); - String key = ctx.name() + "-" + socketAddress.getHostName() + ":" + socketAddress.getPort(); - Jedis jedis = AlarmRedisConnector.getJedis(); - // 如果不存在,则置为0,并且设置生失效时间 - if (jedis.setnx(key, 0 + "") == 1) { - jedis.expire(key, Config.Server.EXCEPTION_DATA_SENDING_INTERVAL); - } - - if (Config.Server.MAX_SEND_EXCEPTION_DATA_COUNT > jedis.incr(key)) { - ctx.channel().close(); - } + byte[] data = ProtocolPackager.unpack(msg); + if (data != null) { + DataBufferThreadContainer.getDataBufferThread().saveTemporarily(data); + } else { + // 处理错误包 + dealFailedPackage(ctx); } } } - /** - * 生成校验和参数 - * - * @param data - * @return - */ - private byte[] generateChecksum(String data) { - char[] dataArray = data.toCharArray(); - int result = dataArray[0]; - for (int i = 0; i < dataArray.length; i++) { - result ^= dataArray[i]; + private void dealFailedPackage(ChannelHandlerContext ctx) { + InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().localAddress(); + String key = ctx.name() + "-" + socketAddress.getHostName() + ":" + socketAddress.getPort(); + Jedis jedis = AlarmRedisConnector.getJedis(); + if (jedis.setnx(key, 0 + "") == 1) { + jedis.expire(key, Config.Server.FAILED_PACKAGE_WATCHING_TIME_WINDOW); } - return intToBytes(result); + if (Config.Server.MAX_WATCHING_FAILED_PACKAGE_SIZE > jedis.incr(key)) { + ctx.channel().close(); + } } - private byte[] intToBytes(int value) { - byte[] src = new byte[4]; - src[0] = (byte) ((value >> 24) & 0xFF); - src[1] = (byte) ((value >> 16) & 0xFF); - src[2] = (byte) ((value >> 8) & 0xFF); - src[3] = (byte) (value & 0xFF); - return src; - } + } diff --git a/skywalking-server/src/main/resources/config.properties b/skywalking-server/src/main/resources/config.properties index 693a953461da368eabbbd9533d659c45b67888aa..2090f0042d245f2d3d6fd9198aa1da7c161725fe 100644 --- a/skywalking-server/src/main/resources/config.properties +++ b/skywalking-server/src/main/resources/config.properties @@ -1,8 +1,8 @@ #采集服务器的端口 server.port=34000 server.max_deal_data_thread_number=5 -server.exception_data_sending_interval=300 -server.max_send_exception_data_count=200; +server.failed_package_watching_time_windowss=300 +server.max_watching_failed_package_size=200; #每个线程最大缓存数量 buffer.per_thread_max_buffer_number=1024