提交 743b36f0 编写于 作者: wu-sheng's avatar wu-sheng

1.为SDK增加大量的健康报告检查代码。定期输出健康日志,方便错误排查。

上级 8072abcd
......@@ -134,12 +134,4 @@
</plugin>
</plugins>
</build>
<distributionManagement>
<snapshotRepository>
<id>company-private-nexus-library-snapshots</id>
<name>company-private-nexus-library-snapshots</name>
<url>http://10.1.228.199:18081/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project>
......@@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.conf.Constants;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
import com.ai.cloud.skywalking.sender.DataSenderFactoryWithBalance;
import com.ai.cloud.skywalking.util.AtomicRangeInteger;
......@@ -42,8 +44,10 @@ public class BufferGroup {
logger.warn(
"Group[{}] index[{}] data collision, discard old data.",
groupName, i);
SDKHealthCollector.getCurrentHeathReading("BufferGroup").updateData(HeathReading.WARNING, "BufferGroup index[" + i + "] data collision, data been coverd.");
}
dataBuffer[i] = span;
SDKHealthCollector.getCurrentHeathReading("BufferGroup").updateData(HeathReading.INFO, "save span");
}
class ConsumerWorker extends Thread {
......
package com.ai.cloud.skywalking.conf;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
public class AuthDesc {
static boolean isAuth = false;
static {
ConfigInitializer.initialize();
ConfigValidator.validate();
SDKHealthCollector.init();
}
public static boolean isAuth() {
......
......@@ -74,4 +74,9 @@ public class Config {
public static long RETRY_FIND_CONNECTION_SENDER = 1000;
}
public static class HealthCollector {
// 默认健康检查上报时间
public static long REPORT_INTERVAL = 5 * 60 * 1000L;
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.selfexamination;
import java.util.HashMap;
import java.util.Map;
public class HeathReading {
public static final String ERROR = "[ERROR]";
public static final String WARNING = "[WARNING]";
public static final String INFO = "[INFO]";
private String id;
private Map<String, HeathDetailData> datas = new HashMap<String, HeathDetailData>();
/**
* 健康读数,只应该在工作线程中创建
*
*/
public HeathReading(String id) {
this.id = id;
}
public void updateData(String key, String newData){
if(datas.containsKey(key)){
datas.get(key).updateData(newData);
}else{
datas.put(key, new HeathDetailData(newData));
}
}
@Override
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("id<").append(this.id).append(">\n");
for(Map.Entry<String, HeathDetailData> data : datas.entrySet()){
sb.append(data.getKey()).append(data.getValue().toString()).append("\n");
}
//reset data
datas = new HashMap<String, HeathReading.HeathDetailData>();
return sb.toString();
}
class HeathDetailData{
private String data;
private long statusTime;
HeathDetailData(String initialData){
data = initialData;
statusTime = System.currentTimeMillis();
}
void updateData(String newData){
data = newData;
statusTime = System.currentTimeMillis();
}
String getData() {
return data;
}
long getStatusTime() {
return statusTime;
}
@Override
public String toString(){
return data + "(t:" + statusTime + ")";
}
}
}
package com.ai.cloud.skywalking.selfexamination;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.conf.AuthDesc;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.util.BuriedPointMachineUtil;
public class SDKHealthCollector extends Thread {
private Logger logger = LogManager.getLogger(SDKHealthCollector.class);
private static Map<String, HeathReading> heathReadings = new ConcurrentHashMap<String, HeathReading>();
private SDKHealthCollector(){
super("HealthCollector");
}
public static void init(){
if(AuthDesc.isAuth()){
new SDKHealthCollector().start();
}
}
public static HeathReading getCurrentHeathReading(String extraId) {
String id = getId(extraId);
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
heathReadings.put(id, new HeathReading(id));
}
}
}
return heathReadings.get(id);
}
private static String getId(String extraId) {
return "SDK-API,M:" + BuriedPointMachineUtil.getHostDesc() + ",P:"
+ BuriedPointMachineUtil.getProcessNo() + ",T:"
+ Thread.currentThread().getName() + "("
+ Thread.currentThread().getId() + ")"
+ (extraId == null ? "" : ",extra:" + extraId);
}
@Override
public void run() {
while (true) {
try {
Map<String, HeathReading> heathReadingsSnapshot = heathReadings;
heathReadings = new ConcurrentHashMap<String, HeathReading>();
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------SDK Health Collector Report---------\n");
for(String key : keyList){
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
log.append("------------------------------------------------\n");
logger.info(log);
try {
Thread.sleep(Config.HealthCollector.REPORT_INTERVAL);
} catch (InterruptedException e) {
logger.warn("sleep error.", e);
}
} catch (Throwable t) {
logger.error("SDKHealthCollector report error.", t);
}
}
}
}
......@@ -18,6 +18,8 @@ import com.ai.cloud.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import com.ai.cloud.io.netty.handler.codec.LengthFieldPrepender;
import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayDecoder;
import com.ai.cloud.io.netty.handler.codec.bytes.ByteArrayEncoder;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
public class DataSender implements IDataSender {
private EventLoopGroup group;
......@@ -71,12 +73,15 @@ public class DataSender implements IDataSender {
try {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(data.getBytes());
SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.INFO, "DataSender send data successfully.");
return true;
}else{
DataSenderFactoryWithBalance.unRegister(this);
SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.WARNING, "DataSender channel isn't active. unregister sender.");
}
} catch (Exception e) {
DataSenderFactoryWithBalance.unRegister(this);
SDKHealthCollector.getCurrentHeathReading("sender").updateData(HeathReading.WARNING, "DataSender channel broken. unregister sender.");
}
return false;
......
......@@ -20,6 +20,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
import com.ai.cloud.skywalking.util.StringUtil;
public class DataSenderFactoryWithBalance {
......@@ -131,6 +133,7 @@ public class DataSenderFactoryWithBalance {
unusedServerAddresses.add(tmpDataSender
.getServerIp());
senderIterator.remove();
SDKHealthCollector.getCurrentHeathReading("remove").updateData(HeathReading.INFO, "remove disconnected sender.");
}
}
......@@ -141,7 +144,7 @@ public class DataSenderFactoryWithBalance {
break;
}
usingDataSender.add(newSender);
SDKHealthCollector.getCurrentHeathReading("add").updateData(HeathReading.INFO, "add new sender.");
}
// try to switch.
......@@ -175,11 +178,15 @@ public class DataSenderFactoryWithBalance {
.getServerIp());
unusedServerAddresses.add(toBeSwitchSender
.getServerIp());
SDKHealthCollector.getCurrentHeathReading("switch").updateData(HeathReading.INFO, "switch existed sender.");
}
}
sleepTime = 0;
}
SDKHealthCollector.getCurrentHeathReading(null).updateData(HeathReading.INFO, "using available DataSender size:" + usingDataSender);
} catch (Throwable e) {
SDKHealthCollector.getCurrentHeathReading(null).updateData(HeathReading.ERROR, "DataSenderChecker running failed:" + e.getMessage());
logger.error("DataSenderChecker running failed", e);
}
......
......@@ -5,6 +5,9 @@ import static com.ai.cloud.skywalking.conf.Config.Sender.MAX_COPY_NUM;
import java.util.HashSet;
import java.util.Set;
import com.ai.cloud.skywalking.selfexamination.HeathReading;
import com.ai.cloud.skywalking.selfexamination.SDKHealthCollector;
/**
* 带副本的数据发送器
*
......@@ -52,6 +55,7 @@ public class DataSenderWithCopies implements IDataSender {
successNum++;
}
}
SDKHealthCollector.getCurrentHeathReading("DataSenderWithCopies").updateData(HeathReading.INFO, "DataSender send data with copynum=" + successNum + " successfully.");
if (senders.size() == 1 && successNum == 1) {
return true;
} else if (successNum >= 2) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册