提交 88d678e6 编写于 作者: Z zhangxin10

1. 将skywalking-api配置挪到skywalking-config

2. 添加如果没有skywalking没有配置时不工作
上级 c496b893
......@@ -5,7 +5,10 @@
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-api</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<modules>
<module>../skywalking-config</module>
</modules>
<packaging>pom</packaging>
<name>skywalking-api</name>
<url>http://maven.apache.org</url>
......@@ -21,6 +24,11 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-config</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
......
......@@ -7,23 +7,25 @@ import com.ai.cloud.skywalking.sender.DataSenderFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static com.ai.cloud.skywalking.buffer.config.BufferConfig.*;
import static com.ai.cloud.skywalking.conf.Config.Buffer.BUFFER_MAX_SIZE;
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_CONSUMER;
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_WAIT_TIME;
import static com.ai.cloud.skywalking.conf.Config.Sender.MAX_BUFFER_DATA_SIZE;
public class BufferGroup {
public static CountDownLatch count;
private String groupName;
private Span[] dataBuffer = new Span[GROUP_MAX_SIZE];
private Span[] dataBuffer = new Span[BUFFER_MAX_SIZE];
AtomicInteger index = new AtomicInteger(0);
public BufferGroup(String groupName) {
this.groupName = groupName;
int step = (int) Math.ceil(GROUP_MAX_SIZE * 1.0 / MAX_WORKER);
int step = (int) Math.ceil(BUFFER_MAX_SIZE * 1.0 / MAX_CONSUMER);
int start = 0, end = 0;
while (true) {
if (end + step >= GROUP_MAX_SIZE){
new ConsumerWorker(start, GROUP_MAX_SIZE).start();
if (end + step >= BUFFER_MAX_SIZE) {
new ConsumerWorker(start, BUFFER_MAX_SIZE).start();
break;
}
end += step;
......@@ -33,7 +35,7 @@ public class BufferGroup {
}
public void save(Span span) {
int i = Math.abs(index.getAndIncrement() % GROUP_MAX_SIZE);
int i = Math.abs(index.getAndIncrement() % BUFFER_MAX_SIZE);
if (dataBuffer[i] != null) {
// TODO 需要上报
System.out.println(span.getLevelId() + "在Group[" + groupName + "]的第" + i + "位冲突");
......@@ -43,7 +45,7 @@ public class BufferGroup {
class ConsumerWorker extends Thread {
private int start = 0;
private int end = GROUP_MAX_SIZE;
private int end = BUFFER_MAX_SIZE;
private StringBuilder builder = new StringBuilder();
private ConsumerWorker(int start, int end) {
......@@ -57,27 +59,26 @@ public class BufferGroup {
@Override
public void run() {
int index = 0;
StringBuilder data = new StringBuilder();
while (true) {
boolean bool = false;
StringBuilder data = new StringBuilder();
for (int i = start; i < end; i++) {
if (dataBuffer[i] == null) {
continue;
}
bool = true;
data.append(dataBuffer[i] + ";");
dataBuffer[i++] = null;
if (i == SEND_MAX_SIZE) {
// TODO 发送失败了怎么办?
data.append(dataBuffer[i]);
dataBuffer[i] = null;
if (index++ == MAX_BUFFER_DATA_SIZE) {
DataSenderFactory.getSender().send(data.toString());
i = 0;
index = 0;
data = new StringBuilder();
}
}
if (!bool) {
try {
Thread.sleep(5L);
Thread.sleep(MAX_WAIT_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
......
......@@ -4,19 +4,19 @@ import com.ai.cloud.skywalking.context.Span;
import java.util.concurrent.ThreadLocalRandom;
import static com.ai.cloud.skywalking.buffer.config.BufferConfig.POOL_MAX_SIZE;
import static com.ai.cloud.skywalking.conf.Config.Buffer.POOL_MAX_LENGTH;
class BufferPool {
private static BufferGroup[] bufferGroups = new BufferGroup[POOL_MAX_SIZE];
private static BufferGroup[] bufferGroups = new BufferGroup[POOL_MAX_LENGTH];
static {
for (int i = 0; i < POOL_MAX_SIZE; i++) {
for (int i = 0; i < POOL_MAX_LENGTH; i++) {
bufferGroups[i] = new BufferGroup("BufferLine-" + i);
}
}
public void save(Span span) {
bufferGroups[ThreadLocalRandom.current().nextInt(0, POOL_MAX_SIZE)].save(span);
bufferGroups[ThreadLocalRandom.current().nextInt(0, POOL_MAX_LENGTH)].save(span);
}
}
package com.ai.cloud.skywalking.buffer;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.conf.ConfigInitializer;
import com.ai.cloud.skywalking.context.Span;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class ContextBuffer {
static BufferPool pool = new BufferPool();
private static boolean isAuth = true;
static {
InputStream inputStream = ContextBuffer.class.getResourceAsStream("/sky-walking.auth");
if (inputStream == null) {
isAuth = false;
}
if (isAuth) {
try {
Properties properties = new Properties();
properties.load(inputStream);
ConfigInitializer.initialize(properties, Config.class);
} catch (IllegalAccessException e) {
isAuth = false;
} catch (IOException e) {
isAuth = false;
}
ContextBuffer.init();
}
}
private static BufferPool pool;
private ContextBuffer() {
//non
}
private static void init() {
if (pool == null)
pool = new BufferPool();
}
public static void save(Span span) {
if (!isAuth)
return;
pool.save(span);
}
}
package com.ai.cloud.skywalking.conf;
public class Config {
public static class Consumer {
//最大消费线程数
public static int MAX_CONSUMER = 2;
//消费者最大等待时间
public static long MAX_WAIT_TIME = 5L;
}
public static class Buffer {
// 每个Buffer的最大个数
public static int BUFFER_MAX_SIZE = 18000;
// Buffer池的最大长度
public static int POOL_MAX_LENGTH = 5;
}
public static class Sender {
// 发送的最大条数
public static int MAX_BUFFER_DATA_SIZE = 1;
// 最大发送者的连接数阀比例
public static int SEND_CONNECTION_THRESHOLD = 2;
// 发送服务端配置
public static String SENDER_SERVERS;
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.conf;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedList;
import java.util.Properties;
public class ConfigInitializer {
public static void initialize(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
initNextLevel(properties, rootConfigType, new ConfigDesc());
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType, ConfigDesc parentDesc) throws NumberFormatException, IllegalArgumentException, IllegalAccessException {
for (Field field : recentConfigType.getFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
}
}
}
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<String>();
void append(String currentDesc) {
descs.addLast(currentDesc);
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
if (descs.size() == 0) {
return "";
}
StringBuilder ret = new StringBuilder(descs.getFirst());
boolean first = true;
for (String desc : descs) {
if (first) {
first = false;
continue;
}
ret.append(".").append(desc);
}
return ret.toString();
}
}
......@@ -5,7 +5,7 @@ public class SendData {
private String URI;
private String businessKey;
private SendData() {
public SendData() {
//Non
}
......@@ -21,14 +21,14 @@ public class SendData {
return businessKey;
}
public static BaseSendDataBuilder newBuilder() {
return new BaseSendDataBuilder();
public static SendDataBuilder newBuilder() {
return new SendDataBuilder();
}
public static class BaseSendDataBuilder {
public static class SendDataBuilder {
private SendData sendData;
BaseSendDataBuilder() {
SendDataBuilder() {
sendData = new SendData();
}
......@@ -36,17 +36,17 @@ public class SendData {
return sendData;
}
public BaseSendDataBuilder viewPoint(String viewPoint) {
public SendDataBuilder viewPoint(String viewPoint) {
sendData.viewPoint = viewPoint;
return this;
}
public BaseSendDataBuilder URI(String uri) {
public SendDataBuilder URI(String uri) {
sendData.URI = uri;
return this;
}
public BaseSendDataBuilder businessKey(String businessKey) {
public SendDataBuilder businessKey(String businessKey) {
sendData.businessKey = businessKey;
return this;
}
......
package com.ai.cloud.skywalking.sender;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.util.StringUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
......@@ -8,6 +11,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static com.ai.cloud.skywalking.conf.Config.Sender.SEND_CONNECTION_THRESHOLD;
public class DataSenderFactory {
......@@ -16,23 +20,40 @@ public class DataSenderFactory {
private static List<DataSender> availableSenders = new ArrayList<DataSender>();
static {
socketAddresses.add(new InetSocketAddress("10.1.235.197", 34000));
socketAddresses.add(new InetSocketAddress("10.1.235.197", 35000));
try {
if (StringUtil.isEmpty(Config.Sender.SENDER_SERVERS)) {
throw new IllegalArgumentException("Collection service configuration error.");
}
for (String serverConfig : Config.Sender.SENDER_SERVERS.split(";")) {
String[] server = serverConfig.split(":");
if (server.length != 2)
throw new IllegalArgumentException("Collection service configuration error.");
socketAddresses.add(new InetSocketAddress(server[0], Integer.valueOf(server[1])));
}
} catch (Exception e) {
System.err.print("Collection service configuration error.");
System.exit(-1);
}
new DataSenderMaker().start();
}
public static DataSender getSender() {
return availableSenders.get(ThreadLocalRandom.current().nextInt(availableSenders.size()));
return availableSenders.get(ThreadLocalRandom.current().nextInt(0, availableSenders.size()));
}
static class DataSenderMaker extends Thread {
private int avaiableSize = (int) Math.ceil(socketAddresses.size() * 1.0 / SEND_CONNECTION_THRESHOLD);
public DataSenderMaker() {
// 初始化DataSender
Iterator<SocketAddress> it = socketAddresses.iterator();
List<SocketAddress> usedSocketAddress = new ArrayList<SocketAddress>();
for (SocketAddress socketAddress : socketAddresses) {
if (availableSenders.size() >= socketAddresses.size() / 2) {
if (availableSenders.size() >= avaiableSize) {
break;
}
try {
......@@ -51,7 +72,7 @@ public class DataSenderFactory {
while (true) {
//当可用的Sender的数量和保存的地址的比例不在1:2,则不创建
for (SocketAddress socketAddress : unUsedSocketAddresses) {
if (availableSenders.size() >= socketAddresses.size() / 2) {
if (availableSenders.size() >= avaiableSize) {
break;
}
try {
......
......@@ -21,13 +21,9 @@ public class SpanBufferTest {
private int sizeCount = 0;
private String fileName = "d:\\test-data.txt";
public SpanBufferTest(int threadSize, int sizeCount, int poolSize, int groupSize, int workerSize) {
public SpanBufferTest(int threadSize, int sizeCount) {
this.threadSize = threadSize;
this.sizeCount = sizeCount;
BufferGroup.count = new CountDownLatch(threadSize * sizeCount);
BufferConfig.MAX_WORKER = workerSize;
BufferConfig.GROUP_MAX_SIZE = groupSize;
BufferConfig.POOL_MAX_SIZE = poolSize;
}
public int getThreadSize() {
......@@ -41,7 +37,7 @@ public class SpanBufferTest {
@Parameterized.Parameters
public static Collection<Integer[]> getParams() {
return Arrays.asList(new Integer[][]{
{2000, 100000, 5, 30000, 3},
{2000, 10},
// {2000, 100000, 5, 27000, 3},
// {2000, 100000, 5, 24000, 3},
// {2000, 100000, 5, 20000, 2},
......@@ -69,9 +65,7 @@ public class SpanBufferTest {
new ContextBufferThread(countDownLatch, sizeCount).start();
}
countDownLatch.await();
long end = System.currentTimeMillis() - start;
CountDownLatch countDownLatchA = new CountDownLatch(threadSize);
start = System.currentTimeMillis();
sleepTime = 1000;
for (int i = 0; i < threadSize; i++) {
if (i % 100 == 0) {
......@@ -84,33 +78,5 @@ public class SpanBufferTest {
new ContextBufferThreadA(countDownLatchA, sizeCount).start();
}
countDownLatchA.await();
long endA = System.currentTimeMillis() - start;
System.out.print("执行完毕!");
StringBuilder builder = new StringBuilder();
builder.append(threadSize + "\t");
builder.append(sizeCount + "\t");
builder.append(BufferConfig.MAX_WORKER + "\t");
builder.append(BufferConfig.GROUP_MAX_SIZE + "\t");
builder.append(BufferConfig.POOL_MAX_SIZE + "\t");
builder.append("1 ms/1\t");
builder.append((sizeCount * threadSize * 1.0 * 1000) / (end) + "\t");
builder.append(((end - endA) * 1.0 / end) + "\t");
builder.append(BufferGroup.count.getCount() + "\t" + (BufferGroup.count.getCount() == 0) + "\t\n");
appendMethodA(fileName, builder.toString());
System.out.println("结果输出成功");
assertEquals(1, 1);
}
private void appendMethodA(String fileName, String content) {
try {
RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
long fileLength = randomFile.length();
randomFile.seek(fileLength);
randomFile.writeBytes(content);
randomFile.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.sender;
import com.ai.cloud.skywalking.buffer.config.BufferConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -14,19 +13,15 @@ public class SenderTest {
private int countSize;
private int threadSize;
public SenderTest(int threadSize, int countSize, int poolSize, int groupSize, int workerSize, int sendSize) {
public SenderTest(int threadSize, int countSize) {
this.threadSize = threadSize;
this.countSize = countSize;
BufferConfig.MAX_WORKER = workerSize;
BufferConfig.GROUP_MAX_SIZE = groupSize;
BufferConfig.POOL_MAX_SIZE = poolSize;
BufferConfig.SEND_MAX_SIZE = sendSize;
}
@Parameterized.Parameters
public static Collection<Integer[]> getParams() {
return Arrays.asList(new Integer[][]{
{10, 100, 1, 1, 1, 1},
{1, 100},
});
}
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-config</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>skywalking-config</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
#最大消费线程数
consumer.max_consumer=2
#消费者最大等待时间
consumer.max_wait_time=5
#每个Buffer的最大个数
buffer.buffer_max_size=18000
#Buffer池的最大长度
buffer.pool_max_length=5
#发送的最大条数
sender.max_buffer_data_size=1000
#最大发送者的连接数阀比例
sender.send_connection_threshold=1
#发送服务端配置
#sender.sender_servers=127.0.0.1:34000
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册