提交 eb52c386 编写于 作者: wu-sheng's avatar wu-sheng

Remoe health-report module. Because the HealthCollector is based on...

Remoe health-report module. Because the HealthCollector is based on ConcurrentHashMap. This triggers a performance issue, when the TPS is very high.( > 500)
上级 7637b1f8
......@@ -13,7 +13,6 @@
<modules>
<module>skywalking-trace</module>
<module>skywalking-logging</module>
<module>skywalking-health-report</module>
<module>skywalking-util</module>
</modules>
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-commons</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-health-report</artifactId>
<packaging>jar</packaging>
<name>skywalking-health-report</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
package com.a.eye.skywalking.health.report;
import com.a.eye.skywalking.api.logging.api.ILog;
import com.a.eye.skywalking.api.logging.api.LogManager;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HealthCollector extends Thread {
private static ILog logger = LogManager.getLogger(HealthCollector.class);
private static Map<String, HeathReading> heathReadings = new ConcurrentHashMap<String, HeathReading>();
private static final long DEFAULT_REPORT_INTERVAL = 60 * 1000;
private final long reportInterval;
private String reporterName;
private HealthCollector(String reporterName) {
this(DEFAULT_REPORT_INTERVAL);
this.reporterName = reporterName;
}
private HealthCollector(long reportInterval) {
super("HealthCollector");
this.setDaemon(true);
this.reportInterval = reportInterval;
}
public static void init(String reporterName) {
new HealthCollector(reporterName).start();
}
public static HeathReading getCurrentHeathReading(String extraId) {
String id = getId(extraId);
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if (heathReadings.keySet().size() > 5000) {
logger.warn("use HealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
}else {
heathReadings.put(id, new HeathReading(id));
}
}
}
}
return heathReadings.get(id);
}
private static String getId(String extraId) {
return "T:" + Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")" + (extraId == null ? "" : ",extra:" + extraId);
}
@Override
public void run() {
while (true) {
try {
Map<String, HeathReading> heathReadingsSnapshot = heathReadings;
heathReadings = new ConcurrentHashMap<String, HeathReading>();
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------" + reporterName + " Health Report---------\n");
for (String key : keyList) {
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
log.append("------------------------------------------------\n");
logger.info(log.toString());
try {
Thread.sleep(reportInterval);
} catch (InterruptedException e) {
}
} catch (Throwable t) {
logger.error("HealthCollector report error.", t);
}
}
}
}
package com.a.eye.skywalking.health.report;
import java.util.HashMap;
import java.util.Map;
public class HeathReading {
public static final String ERROR = "[ERROR]";
public static final String WARNING = "[WARNING]";
public static final String INFO = "[INFO]";
private String id;
private Map<String, HeathDetailData> datas = new HashMap<String, HeathDetailData>();
/**
* 健康读数,只应该在工作线程中创建
*/
public HeathReading(String id) {
this.id = id;
}
public void updateData(String key, String message) {
updateData(key, message, new Object[0]);
}
public void updateData(String key, String newData, Object... arguments) {
if (datas.containsKey(key)) {
datas.get(key).updateData(newData, arguments);
} else {
datas.put(key, new HeathDetailData(newData, arguments));
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("id<").append(this.id).append(">\n");
for (Map.Entry<String, HeathDetailData> data : datas.entrySet()) {
sb.append(data.getKey()).append(data.getValue().toString()).append("\n");
}
datas = new HashMap<String, HeathReading.HeathDetailData>();
return sb.toString();
}
class HeathDetailData {
private String data;
private long statusTime;
HeathDetailData(String initialData) {
this(initialData, new Object[0]);
}
HeathDetailData(String initialData, Object[] arguments) {
data = initialData;
if (arguments.length > 0)
data = String.format(initialData, arguments);
statusTime = System.currentTimeMillis();
}
void updateData(String newData, Object... arguments) {
data = newData;
if (arguments.length > 0)
data = String.format(newData, arguments);
statusTime = System.currentTimeMillis();
}
String getData() {
return data;
}
long getStatusTime() {
return statusTime;
}
@Override
public String toString() {
return data + "(t:" + statusTime + ")";
}
}
}
......@@ -36,11 +36,6 @@
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-health-report</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
......
......@@ -3,8 +3,6 @@ package com.a.eye.skywalking.api.queue;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.context.TracerContextListener;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.trace.TraceSegment;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
......@@ -24,8 +22,6 @@ public enum TraceSegmentProcessQueue implements TracerContextListener {
try {
TraceSegmentHolder data = this.buffer.get(sequence);
data.setValue(traceSegment);
HealthCollector.getCurrentHeathReading("TraceSegmentProcessQueue").updateData(HeathReading.INFO, "receive finished traceSegment.");
} finally {
this.buffer.publish(sequence);
}
......@@ -36,7 +32,7 @@ public enum TraceSegmentProcessQueue implements TracerContextListener {
RingBuffer<TraceSegmentHolder> buffer;
TraceSegmentProcessQueue() {
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
disruptor = new Disruptor<>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
buffer = disruptor.getRingBuffer();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册