提交 7ae6bd6c 编写于 作者: A ascrutae

Merge remote-tracking branch 'origin/master'

package com.ai.cloud.skywalking.sender;
import com.ai.cloud.skywalking.util.ProtocolPackager;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
......@@ -75,14 +76,10 @@ public class DataSender implements IDataSender {
public boolean send(String data) {
try {
if (channel != null && channel.isActive()) {
// 对协议格式进行修改
// | check sum(4 byte) | data
byte[] dataArray = new byte[data.getBytes().length + 4];
System.arraycopy(dataArray,0, dataArray,4, dataArray.length);
byte[] checkSumArray = generateChecksum(data);
System.arraycopy(checkSumArray,0, dataArray,0, checkSumArray.length);
channel.writeAndFlush(dataArray);
byte[] dataPackage = ProtocolPackager.pack(data.getBytes());
channel.writeAndFlush(dataPackage);
SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.INFO, "DataSender[" + socketAddress + "] send data successfully.");
return true;
}else{
......@@ -97,30 +94,6 @@ public class DataSender implements IDataSender {
return false;
}
/**
* 生成校验和参数
* @param data
* @return
*/
private byte[] generateChecksum(String data) {
char[] dataArray = data.toCharArray();
int result = dataArray[0];
for (int i = 0; i < dataArray.length; i++) {
result ^= dataArray[i];
}
return intToBytes(result);
}
private byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) ((value >> 24) & 0xFF);
src[1] = (byte) ((value >> 16) & 0xFF);
src[2] = (byte) ((value >> 8) & 0xFF);
src[3] = (byte) (value & 0xFF);
return src;
}
public InetSocketAddress getServerAddr() {
return this.socketAddress;
}
......
......@@ -4,7 +4,7 @@ import org.junit.Test;
public class CheckSumTest {
private static final int dataIndex = 2;
private static final int MAX_TEST_COUNT = 100000;
private static final int MAX_TEST_COUNT = 100_000_00;
@Test
public void TestAllXORSum() {
......
......@@ -4,7 +4,7 @@
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-server</artifactId>
<version>1.0b</version>
<version>1.0-Final</version>
<packaging>jar</packaging>
<name>skywalking-server</name>
......
......@@ -10,10 +10,10 @@ public class Config {
public static int MAX_DEAL_DATA_THREAD_NUMBER = 3;
// 异常数据的时间间隔
public static int EXCEPTION_DATA_SENDING_INTERVAL = 5 * 60;
public static int FAILED_PACKAGE_WATCHING_TIME_WINDOW = 5 * 60;
// 时间间隔内最大异常数据次数
public static int MAX_SEND_EXCEPTION_DATA_COUNT = 200;
public static int MAX_WATCHING_FAILED_PACKAGE_SIZE = 200;
}
// 数据缓存配置类
......
......@@ -3,12 +3,12 @@ package com.ai.cloud.skywalking.reciever.handler;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.storage.AlarmRedisConnector;
import com.ai.cloud.skywalking.util.ProtocolPackager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.Arrays;
public class CollectionServerDataHandler extends SimpleChannelInboundHandler<byte[]> {
......@@ -17,55 +17,30 @@ public class CollectionServerDataHandler extends SimpleChannelInboundHandler<byt
Thread.currentThread().setName("ServerReceiver");
// 当接受到这条消息的是空,则忽略
if (msg != null && msg.length >= 0 && msg.length < Config.DataPackage.MAX_DATA_PACKAGE) {
// | check sum(4 byte) | data |
byte[] originCheckSum = new byte[4];
System.arraycopy(msg, 0, originCheckSum, 0, 4);
// 对协议进行拆包
String data = new String(msg, 4, msg.length - 4);
// 计算校验和
byte[] checkSum = generateChecksum(data);
if (Arrays.equals(originCheckSum, checkSum)) {
DataBufferThreadContainer.getDataBufferThread().saveTemporarily(data.getBytes());
} else {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().localAddress();
String key = ctx.name() + "-" + socketAddress.getHostName() + ":" + socketAddress.getPort();
Jedis jedis = AlarmRedisConnector.getJedis();
// 如果不存在,则置为0,并且设置生失效时间
if (jedis.setnx(key, 0 + "") == 1) {
jedis.expire(key, Config.Server.EXCEPTION_DATA_SENDING_INTERVAL);
}
if (Config.Server.MAX_SEND_EXCEPTION_DATA_COUNT > jedis.incr(key)) {
ctx.channel().close();
}
byte[] data = ProtocolPackager.unpack(msg);
if (data != null) {
DataBufferThreadContainer.getDataBufferThread().saveTemporarily(data);
} else {
// 处理错误包
dealFailedPackage(ctx);
}
}
}
/**
* 生成校验和参数
*
* @param data
* @return
*/
private byte[] generateChecksum(String data) {
char[] dataArray = data.toCharArray();
int result = dataArray[0];
for (int i = 0; i < dataArray.length; i++) {
result ^= dataArray[i];
private void dealFailedPackage(ChannelHandlerContext ctx) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().localAddress();
String key = ctx.name() + "-" + socketAddress.getHostName() + ":" + socketAddress.getPort();
Jedis jedis = AlarmRedisConnector.getJedis();
if (jedis.setnx(key, 0 + "") == 1) {
jedis.expire(key, Config.Server.FAILED_PACKAGE_WATCHING_TIME_WINDOW);
}
return intToBytes(result);
if (Config.Server.MAX_WATCHING_FAILED_PACKAGE_SIZE > jedis.incr(key)) {
ctx.channel().close();
}
}
private byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) ((value >> 24) & 0xFF);
src[1] = (byte) ((value >> 16) & 0xFF);
src[2] = (byte) ((value >> 8) & 0xFF);
src[3] = (byte) (value & 0xFF);
return src;
}
}
#采集服务器的端口
server.port=34000
server.max_deal_data_thread_number=5
server.exception_data_sending_interval=300
server.max_send_exception_data_count=200;
server.failed_package_watching_time_windowss=300
server.max_watching_failed_package_size=200;
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册