提交 962db2ba 编写于 作者: wu-sheng's avatar wu-sheng

1.解决chain异常时可能造成死循环的问题

2.增加部分异常输出
3.解决SaveToHBaseChain,建立base连接失败,无法在运行态重连的问题
上级 624a46df
...@@ -6,7 +6,13 @@ import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading; ...@@ -6,7 +6,13 @@ import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
import java.util.List; import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Chain { public class Chain {
private static Logger logger = LogManager
.getLogger(Chain.class);
private List<IStorageChain> chains; private List<IStorageChain> chains;
private int index = 0; private int index = 0;
...@@ -19,9 +25,11 @@ public class Chain { ...@@ -19,9 +25,11 @@ public class Chain {
if (index < chains.size()) { if (index < chains.size()) {
while (true) { while (true) {
try { try {
chains.get(index++).doChain(spans, this); chains.get(index).doChain(spans, this);
index++;
break; break;
} catch (Throwable e) { } catch (Throwable e) {
logger.error("do chain at index[{}] failure.", index, e);
ServerHealthCollector.getCurrentHeathReading("storage-chain").updateData(ServerHeathReading.ERROR, ServerHealthCollector.getCurrentHeathReading("storage-chain").updateData(ServerHeathReading.ERROR,
"Failed to do chain action. spans list hash code:" + spans.hashCode() + ",Cause:" + e.getMessage()); "Failed to do chain action. spans list hash code:" + spans.hashCode() + ",Cause:" + e.getMessage());
} }
......
...@@ -22,112 +22,143 @@ import java.util.ArrayList; ...@@ -22,112 +22,143 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public class SaveToHBaseChain implements IStorageChain { public class SaveToHBaseChain implements IStorageChain {
private static Logger logger = LogManager.getLogger(SaveToHBaseChain.class); private static Logger logger = LogManager.getLogger(SaveToHBaseChain.class);
private static Configuration configuration = null; private static Configuration configuration = null;
private static Connection connection; private static Connection connection;
@Override @Override
public void doChain(List<Span> spans, Chain chain) { public void doChain(List<Span> spans, Chain chain) {
bulkInsertBuriedPointData(spans); if (connection == null || connection.isClosed()) {
chain.doChain(spans); initHBaseClient();
} }
bulkInsertBuriedPointData(spans);
chain.doChain(spans);
}
private static void initHBaseClient() throws IOException { private synchronized static void initHBaseClient() throws ChainException {
if (configuration == null) { if (configuration == null) {
configuration = HBaseConfiguration.create(); configuration = HBaseConfiguration.create();
if (Config.HBaseConfig.ZK_HOSTNAME == null || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) { if (Config.HBaseConfig.ZK_HOSTNAME == null
logger.error("Miss HBase ZK quorum Configuration", new IllegalArgumentException("Miss HBase ZK quorum Configuration")); || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) {
System.exit(-1); logger.error("Miss HBase ZK quorum Configuration",
} new IllegalArgumentException(
configuration.set("hbase.zookeeper.quorum", Config.HBaseConfig.ZK_HOSTNAME); "Miss HBase ZK quorum Configuration"));
configuration.set("hbase.zookeeper.property.clientPort", Config.HBaseConfig.CLIENT_PORT); System.exit(-1);
connection = ConnectionFactory.createConnection(configuration); }
} configuration.set("hbase.zookeeper.quorum",
} Config.HBaseConfig.ZK_HOSTNAME);
configuration.set("hbase.zookeeper.property.clientPort",
Config.HBaseConfig.CLIENT_PORT);
}
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
ServerHealthCollector.getCurrentHeathReading("hbase").updateData(
ServerHeathReading.ERROR, "connect to hbase failure.");
throw new ChainException("initHBaseClient failure", e);
}
}
static { static {
try { try {
initHBaseClient(); initHBaseClient();
Admin admin = connection.getAdmin(); Admin admin = connection.getAdmin();
if (!admin.isTableAvailable(TableName.valueOf(Config.HBaseConfig.TABLE_NAME))) { if (!admin.isTableAvailable(TableName
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(Config.HBaseConfig.TABLE_NAME)); .valueOf(Config.HBaseConfig.TABLE_NAME))) {
tableDesc.addFamily(new HColumnDescriptor(Config.HBaseConfig.FAMILY_COLUMN_NAME)); HTableDescriptor tableDesc = new HTableDescriptor(
admin.createTable(tableDesc); TableName.valueOf(Config.HBaseConfig.TABLE_NAME));
logger.info("Create table [{}] ok!", Config.HBaseConfig.TABLE_NAME); tableDesc.addFamily(new HColumnDescriptor(
} Config.HBaseConfig.FAMILY_COLUMN_NAME));
} catch (IOException e) { admin.createTable(tableDesc);
logger.error("Create table[{}] failed", Config.HBaseConfig.TABLE_NAME, e); logger.info("Create table [{}] ok!",
} Config.HBaseConfig.TABLE_NAME);
} }
} catch (IOException e) {
logger.error("Create table[{}] failed",
Config.HBaseConfig.TABLE_NAME, e);
}
}
private static void insert(String tableName, Put put) { private static void insert(String tableName, Put put) {
try { try {
Table table = connection.getTable(TableName.valueOf(tableName)); Table table = connection.getTable(TableName.valueOf(tableName));
table.put(put); table.put(put);
} catch (IOException e) { } catch (IOException e) {
ServerHealthCollector.getCurrentHeathReading("hbase").updateData(ServerHeathReading.ERROR, "save RowKey[" + put.getId() + "] failure."); ServerHealthCollector.getCurrentHeathReading("hbase").updateData(
throw new RuntimeException("Insert the data error.RowKey:[" + put.getId() + "]", e); ServerHeathReading.ERROR,
} "save RowKey[" + put.getId() + "] failure.");
throw new ChainException("Insert the data error.RowKey:["
+ put.getId() + "]", e);
}
} }
private static void bulkInsertBuriedPointData(List<Span> spans) { private static void bulkInsertBuriedPointData(List<Span> spans) {
if (spans == null || spans.size() <= 0) if (spans == null || spans.size() <= 0)
return; return;
List<Put> puts = new ArrayList<Put>(); List<Put> puts = new ArrayList<Put>();
Put put; Put put;
String columnName; String columnName;
for (Span span : spans) { for (Span span : spans) {
put = new Put(Bytes.toBytes(span.getTraceId()), getTSBySpanTraceId(span)); put = new Put(Bytes.toBytes(span.getTraceId()),
if (StringUtils.isEmpty(span.getParentLevel().trim())) { getTSBySpanTraceId(span));
columnName = span.getLevelId() + ""; if (StringUtils.isEmpty(span.getParentLevel().trim())) {
if (span.isReceiver()) { columnName = span.getLevelId() + "";
columnName = span.getLevelId() + "-S"; if (span.isReceiver()) {
} columnName = span.getLevelId() + "-S";
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName), }
Bytes.toBytes(span.getOriginData())); put.addColumn(
} else { Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME),
columnName = span.getParentLevel() + "." + span.getLevelId(); Bytes.toBytes(columnName),
if (span.isReceiver()) { Bytes.toBytes(span.getOriginData()));
columnName = span.getParentLevel() + "." + span.getLevelId() + "-S"; } else {
} columnName = span.getParentLevel() + "." + span.getLevelId();
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName), if (span.isReceiver()) {
Bytes.toBytes(span.getOriginData())); columnName = span.getParentLevel() + "."
} + span.getLevelId() + "-S";
puts.add(put); }
} put.addColumn(
Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME),
Bytes.toBytes(columnName),
Bytes.toBytes(span.getOriginData()));
}
puts.add(put);
}
bulkInsertBuriedPointData(Config.HBaseConfig.TABLE_NAME, puts); bulkInsertBuriedPointData(Config.HBaseConfig.TABLE_NAME, puts);
ServerHealthCollector.getCurrentHeathReading("hbase").updateData(ServerHeathReading.INFO, "save " + spans.size() + " BuriedPointEntries."); ServerHealthCollector.getCurrentHeathReading("hbase").updateData(
} ServerHeathReading.INFO,
"save " + spans.size() + " BuriedPointEntries.");
}
private static long getTSBySpanTraceId(Span span) { private static long getTSBySpanTraceId(Span span) {
try{ try {
return Long.parseLong(span.getTraceId().split("\\.")[2]); return Long.parseLong(span.getTraceId().split("\\.")[2]);
}catch(Throwable t){ } catch (Throwable t) {
Log.warn("can't get timestamp from trace id:" + span.getTraceId() + ", going to use current timestamp."); Log.warn("can't get timestamp from trace id:{}, going to use current timestamp.", span.getTraceId(), t);
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
} }
private static void bulkInsertBuriedPointData(String tableName, List<Put> data) { private static void bulkInsertBuriedPointData(String tableName,
Object[] resultArrays = new Object[data.size()]; List<Put> data) {
try { Object[] resultArrays = new Object[data.size()];
Table table = connection.getTable(TableName.valueOf(tableName)); try {
table.batch(data, resultArrays); Table table = connection.getTable(TableName.valueOf(tableName));
int index = 0; table.batch(data, resultArrays);
for (Object result : resultArrays) { int index = 0;
if (result != null) { for (Object result : resultArrays) {
insert(tableName, data.get(index)); if (result != null) {
} insert(tableName, data.get(index));
index++; }
} index++;
} catch (IOException e) { }
throw new ChainException(e); } catch (IOException e) {
} catch (InterruptedException e) { throw new ChainException(e);
throw new ChainException(e); } catch (InterruptedException e) {
} throw new ChainException(e);
}
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册