提交 c1d3407d 编写于 作者: A ascrutae

完善skywalking协议格式,并完善服务端接受数据的安全性

上级 593e053c
......@@ -18,6 +18,7 @@ import io.netty.handler.codec.bytes.ByteArrayEncoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
......@@ -74,7 +75,14 @@ public class DataSender implements IDataSender {
public boolean send(String data) {
try {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(data.getBytes());
// 对协议格式进行修改
// | check sum(4 byte) | data
byte[] dataArray = new byte[data.getBytes().length + 4];
System.arraycopy(dataArray,0, dataArray,4, dataArray.length);
byte[] checkSumArray = generateChecksum(data);
System.arraycopy(checkSumArray,0, dataArray,0, checkSumArray.length);
channel.writeAndFlush(dataArray);
SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.INFO, "DataSender[" + socketAddress + "] send data successfully.");
return true;
}else{
......@@ -89,6 +97,30 @@ public class DataSender implements IDataSender {
return false;
}
/**
* 生成校验和参数
* @param data
* @return
*/
private byte[] generateChecksum(String data) {
char[] dataArray = data.toCharArray();
int result = dataArray[0];
for (int i = 0; i < dataArray.length; i++) {
result ^= dataArray[i];
}
return intToBytes(result);
}
private byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) ((value >> 24) & 0xFF);
src[1] = (byte) ((value >> 16) & 0xFF);
src[2] = (byte) ((value >> 8) & 0xFF);
src[3] = (byte) (value & 0xFF);
return src;
}
public InetSocketAddress getServerAddr() {
return this.socketAddress;
}
......
......@@ -4,7 +4,7 @@
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-server</artifactId>
<version>1.0-Final</version>
<version>1.0b</version>
<packaging>jar</packaging>
<name>skywalking-server</name>
......@@ -14,6 +14,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<url>https://jcenter.bintray.com</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
......@@ -44,7 +51,7 @@
<dependency>
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-protocol</artifactId>
<version>1.0-Final</version>
<version>1.0b</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
......
......@@ -8,6 +8,12 @@ public class Config {
public static int PORT = 34000;
// 最大数据处理线程数量
public static int MAX_DEAL_DATA_THREAD_NUMBER = 3;
// 异常数据的时间间隔
public static int EXCEPTION_DATA_SENDING_INTERVAL = 5 * 60;
// 时间间隔内最大异常数据次数
public static int MAX_SEND_EXCEPTION_DATA_COUNT = 200;
}
// 数据缓存配置类
......@@ -86,19 +92,22 @@ public class Config {
public static String STORAGE_TYPE = "hbase";
}
public static class Alarm {
public static int ALARM_EXPIRE_SECONDS = 1000 * 60 * 90;
public static int ALARM_EXCEPTION_STACK_LENGTH = 300;
public static class Redis{
public static String REDIS_SERVER = "127.0.0.1:6379";
public static String REDIS_SERVER = "10.1.241.18:16379";
public static int REDIS_MAX_IDLE = 10;
public static int REDIS_MIN_IDLE = 1;
public static int REDIS_MAX_TOTAL = 20;
}
public static class Alarm {
public static int ALARM_EXPIRE_SECONDS = 1000 * 60 * 90;
public static int ALARM_EXCEPTION_STACK_LENGTH = 300;
public static boolean ALARM_OFF_FLAG = false;
......@@ -115,4 +124,5 @@ public class Config {
// 默认健康检查上报时间
public static long REPORT_INTERVAL = 5 * 60 * 1000L;
}
}
\ No newline at end of file
......@@ -2,8 +2,13 @@ package com.ai.cloud.skywalking.reciever.handler;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.storage.AlarmRedisConnector;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.Arrays;
public class CollectionServerDataHandler extends SimpleChannelInboundHandler<byte[]> {
......@@ -12,7 +17,56 @@ public class CollectionServerDataHandler extends SimpleChannelInboundHandler<byt
Thread.currentThread().setName("ServerReceiver");
// 当接受到这条消息的是空,则忽略
if (msg != null && msg.length >= 0 && msg.length < Config.DataPackage.MAX_DATA_PACKAGE) {
DataBufferThreadContainer.getDataBufferThread().saveTemporarily(msg);
// | check sum(4 byte) | data |
byte[] originCheckSum = new byte[4];
System.arraycopy(msg, 0, originCheckSum, 0, 4);
// 对协议进行拆包
String data = new String(msg, 4, msg.length - 4);
// 计算校验和
byte[] checkSum = generateChecksum(data);
if (Arrays.equals(originCheckSum, checkSum)) {
DataBufferThreadContainer.getDataBufferThread().saveTemporarily(data.getBytes());
} else {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().localAddress();
String key = socketAddress.getHostName() + ":" + socketAddress.getPort();
Jedis jedis = AlarmRedisConnector.getJedis();
// 如果不存在,则置为0,并且设置生失效时间
if (jedis.setnx(key, 0 + "") == 1) {
jedis.expire(key, Config.Server.EXCEPTION_DATA_SENDING_INTERVAL);
}
if (Config.Server.MAX_SEND_EXCEPTION_DATA_COUNT > jedis.incr(key)) {
ctx.channel().close();
}
}
}
}
/**
* 生成校验和参数
*
* @param data
* @return
*/
private byte[] generateChecksum(String data) {
char[] dataArray = data.toCharArray();
int result = dataArray[0];
for (int i = 0; i < dataArray.length; i++) {
result ^= dataArray[i];
}
return intToBytes(result);
}
private byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) ((value >> 24) & 0xFF);
src[1] = (byte) ((value >> 16) & 0xFF);
src[2] = (byte) ((value >> 8) & 0xFF);
src[3] = (byte) (value & 0xFF);
return src;
}
}
......@@ -46,7 +46,7 @@ public class AlarmRedisConnector {
public RedisInspector() {
super("RedisInspectorThread");
String redisServerConfig = Config.Alarm.REDIS_SERVER;
String redisServerConfig = Config.Redis.REDIS_SERVER;
if (redisServerConfig == null || redisServerConfig.length() <= 0) {
logger.error("Redis server is not setting. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
......@@ -75,7 +75,7 @@ public class AlarmRedisConnector {
needConnectInit = false;
} catch (Exception e) {
logger.error("can't connect to redis["
+ Config.Alarm.REDIS_SERVER + "]", e);
+ Config.Redis.REDIS_SERVER + "]", e);
} finally {
if (jedis != null) {
jedis.close();
......@@ -119,9 +119,9 @@ public class AlarmRedisConnector {
private GenericObjectPoolConfig buildGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestOnBorrow(true);
genericObjectPoolConfig.setMaxIdle(Config.Alarm.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Alarm.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Alarm.REDIS_MAX_TOTAL);
genericObjectPoolConfig.setMaxIdle(Config.Redis.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Redis.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Redis.REDIS_MAX_TOTAL);
return genericObjectPoolConfig;
}
}
......
#采集服务器的端口
server.port=34000
server.max_deal_data_thread_number=5
server.exception_data_sending_interval=300
server.max_send_exception_data_count=200;
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
......@@ -51,19 +53,20 @@ hbaseconfig.client_port=29181
#告警失效时间
alarm.alarm_expire_seconds=5400
#Redis配置
alarm.redis_server=10.1.241.18:16379
#Redis最大空闲数量
alarm.edis_max_idle=10
#Redis最小空闲数量
alarm.edis_min_idle=1
#Redis最大个数
alarm.edis_max_total=20
#是否关闭告警
alarm.larm_off_flag=false
#告警redis检测器检测周期
alarm.alarm_redis_inspector_interval=5000
#Redis配置
redis.redis_server=10.1.241.18:16379
#Redis最大空闲数量
redis.edis_max_idle=10
#Redis最小空闲数量
redis.edis_min_idle=1
#Redis最大个数
redis.edis_max_total=20
#告警检查器:异常告警检查
alarm.checker.turn_on_exception_checker=true
#告警检查器:执行时间超时告警检查
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册