diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/Chain.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/Chain.java index 0061bb3bb28145607f51c4dccaa5a30cd8648168..505c6f306e54cebf01560b63bdf558b6296b77a9 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/Chain.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/Chain.java @@ -6,7 +6,13 @@ import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class Chain { + private static Logger logger = LogManager + .getLogger(Chain.class); + private List chains; private int index = 0; @@ -19,9 +25,11 @@ public class Chain { if (index < chains.size()) { while (true) { try { - chains.get(index++).doChain(spans, this); + chains.get(index).doChain(spans, this); + index++; break; } catch (Throwable e) { + logger.error("do chain at index[{}] failure.", index, e); ServerHealthCollector.getCurrentHeathReading("storage-chain").updateData(ServerHeathReading.ERROR, "Failed to do chain action. spans list hash code:" + spans.hashCode() + ",Cause:" + e.getMessage()); } diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/chain/SaveToHBaseChain.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/chain/SaveToHBaseChain.java index fbe54abca68abf26b1335f4666c3f0b363f75a30..39a02c744baaada16c5539358e7f9852a4462324 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/chain/SaveToHBaseChain.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/storage/chain/SaveToHBaseChain.java @@ -22,112 +22,143 @@ import java.util.ArrayList; import java.util.List; public class SaveToHBaseChain implements IStorageChain { - private static Logger logger = LogManager.getLogger(SaveToHBaseChain.class); - private static Configuration configuration = null; - private static Connection connection; + private static Logger logger = LogManager.getLogger(SaveToHBaseChain.class); + private static Configuration configuration = null; + private static Connection connection; - @Override - public void doChain(List spans, Chain chain) { - bulkInsertBuriedPointData(spans); - chain.doChain(spans); - } + @Override + public void doChain(List spans, Chain chain) { + if (connection == null || connection.isClosed()) { + initHBaseClient(); + } + bulkInsertBuriedPointData(spans); + chain.doChain(spans); + } - private static void initHBaseClient() throws IOException { - if (configuration == null) { - configuration = HBaseConfiguration.create(); - if (Config.HBaseConfig.ZK_HOSTNAME == null || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) { - logger.error("Miss HBase ZK quorum Configuration", new IllegalArgumentException("Miss HBase ZK quorum Configuration")); - System.exit(-1); - } - configuration.set("hbase.zookeeper.quorum", Config.HBaseConfig.ZK_HOSTNAME); - configuration.set("hbase.zookeeper.property.clientPort", Config.HBaseConfig.CLIENT_PORT); - connection = ConnectionFactory.createConnection(configuration); - } - } + private synchronized static void initHBaseClient() throws ChainException { + if (configuration == null) { + configuration = HBaseConfiguration.create(); + if (Config.HBaseConfig.ZK_HOSTNAME == null + || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) { + logger.error("Miss HBase ZK quorum Configuration", + new IllegalArgumentException( + "Miss HBase ZK quorum Configuration")); + System.exit(-1); + } + configuration.set("hbase.zookeeper.quorum", + Config.HBaseConfig.ZK_HOSTNAME); + configuration.set("hbase.zookeeper.property.clientPort", + Config.HBaseConfig.CLIENT_PORT); + } + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + ServerHealthCollector.getCurrentHeathReading("hbase").updateData( + ServerHeathReading.ERROR, "connect to hbase failure."); + throw new ChainException("initHBaseClient failure", e); + } + } - static { - try { - initHBaseClient(); - Admin admin = connection.getAdmin(); - if (!admin.isTableAvailable(TableName.valueOf(Config.HBaseConfig.TABLE_NAME))) { - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(Config.HBaseConfig.TABLE_NAME)); - tableDesc.addFamily(new HColumnDescriptor(Config.HBaseConfig.FAMILY_COLUMN_NAME)); - admin.createTable(tableDesc); - logger.info("Create table [{}] ok!", Config.HBaseConfig.TABLE_NAME); - } - } catch (IOException e) { - logger.error("Create table[{}] failed", Config.HBaseConfig.TABLE_NAME, e); - } - } + static { + try { + initHBaseClient(); + Admin admin = connection.getAdmin(); + if (!admin.isTableAvailable(TableName + .valueOf(Config.HBaseConfig.TABLE_NAME))) { + HTableDescriptor tableDesc = new HTableDescriptor( + TableName.valueOf(Config.HBaseConfig.TABLE_NAME)); + tableDesc.addFamily(new HColumnDescriptor( + Config.HBaseConfig.FAMILY_COLUMN_NAME)); + admin.createTable(tableDesc); + logger.info("Create table [{}] ok!", + Config.HBaseConfig.TABLE_NAME); + } + } catch (IOException e) { + logger.error("Create table[{}] failed", + Config.HBaseConfig.TABLE_NAME, e); + } + } - private static void insert(String tableName, Put put) { - try { - Table table = connection.getTable(TableName.valueOf(tableName)); - table.put(put); - } catch (IOException e) { - ServerHealthCollector.getCurrentHeathReading("hbase").updateData(ServerHeathReading.ERROR, "save RowKey[" + put.getId() + "] failure."); - throw new RuntimeException("Insert the data error.RowKey:[" + put.getId() + "]", e); - } + private static void insert(String tableName, Put put) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + table.put(put); + } catch (IOException e) { + ServerHealthCollector.getCurrentHeathReading("hbase").updateData( + ServerHeathReading.ERROR, + "save RowKey[" + put.getId() + "] failure."); + throw new ChainException("Insert the data error.RowKey:[" + + put.getId() + "]", e); + } - } + } - private static void bulkInsertBuriedPointData(List spans) { - if (spans == null || spans.size() <= 0) - return; - List puts = new ArrayList(); - Put put; - String columnName; - for (Span span : spans) { - put = new Put(Bytes.toBytes(span.getTraceId()), getTSBySpanTraceId(span)); - if (StringUtils.isEmpty(span.getParentLevel().trim())) { - columnName = span.getLevelId() + ""; - if (span.isReceiver()) { - columnName = span.getLevelId() + "-S"; - } - put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName), - Bytes.toBytes(span.getOriginData())); - } else { - columnName = span.getParentLevel() + "." + span.getLevelId(); - if (span.isReceiver()) { - columnName = span.getParentLevel() + "." + span.getLevelId() + "-S"; - } - put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName), - Bytes.toBytes(span.getOriginData())); - } - puts.add(put); - } + private static void bulkInsertBuriedPointData(List spans) { + if (spans == null || spans.size() <= 0) + return; + List puts = new ArrayList(); + Put put; + String columnName; + for (Span span : spans) { + put = new Put(Bytes.toBytes(span.getTraceId()), + getTSBySpanTraceId(span)); + if (StringUtils.isEmpty(span.getParentLevel().trim())) { + columnName = span.getLevelId() + ""; + if (span.isReceiver()) { + columnName = span.getLevelId() + "-S"; + } + put.addColumn( + Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), + Bytes.toBytes(columnName), + Bytes.toBytes(span.getOriginData())); + } else { + columnName = span.getParentLevel() + "." + span.getLevelId(); + if (span.isReceiver()) { + columnName = span.getParentLevel() + "." + + span.getLevelId() + "-S"; + } + put.addColumn( + Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), + Bytes.toBytes(columnName), + Bytes.toBytes(span.getOriginData())); + } + puts.add(put); + } - bulkInsertBuriedPointData(Config.HBaseConfig.TABLE_NAME, puts); + bulkInsertBuriedPointData(Config.HBaseConfig.TABLE_NAME, puts); - ServerHealthCollector.getCurrentHeathReading("hbase").updateData(ServerHeathReading.INFO, "save " + spans.size() + " BuriedPointEntries."); - } + ServerHealthCollector.getCurrentHeathReading("hbase").updateData( + ServerHeathReading.INFO, + "save " + spans.size() + " BuriedPointEntries."); + } - private static long getTSBySpanTraceId(Span span) { - try{ - return Long.parseLong(span.getTraceId().split("\\.")[2]); - }catch(Throwable t){ - Log.warn("can't get timestamp from trace id:" + span.getTraceId() + ", going to use current timestamp."); - return System.currentTimeMillis(); - } - } + private static long getTSBySpanTraceId(Span span) { + try { + return Long.parseLong(span.getTraceId().split("\\.")[2]); + } catch (Throwable t) { + Log.warn("can't get timestamp from trace id:{}, going to use current timestamp.", span.getTraceId(), t); + return System.currentTimeMillis(); + } + } - private static void bulkInsertBuriedPointData(String tableName, List data) { - Object[] resultArrays = new Object[data.size()]; - try { - Table table = connection.getTable(TableName.valueOf(tableName)); - table.batch(data, resultArrays); - int index = 0; - for (Object result : resultArrays) { - if (result != null) { - insert(tableName, data.get(index)); - } - index++; - } - } catch (IOException e) { - throw new ChainException(e); - } catch (InterruptedException e) { - throw new ChainException(e); - } + private static void bulkInsertBuriedPointData(String tableName, + List data) { + Object[] resultArrays = new Object[data.size()]; + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + table.batch(data, resultArrays); + int index = 0; + for (Object result : resultArrays) { + if (result != null) { + insert(tableName, data.get(index)); + } + index++; + } + } catch (IOException e) { + throw new ChainException(e); + } catch (InterruptedException e) { + throw new ChainException(e); + } - } + } }