提交 3a7e331a 编写于 作者: wu-sheng's avatar wu-sheng

Fix some resources leak. Format log. Set jdk level of common lib to 1.6 . Fix...

Fix some resources leak. Format log. Set jdk level of common lib to 1.6 . Fix some incorrect pom.xml .
上级 82edc01c
......@@ -8,13 +8,13 @@
<version>2.0-2016</version>
<modules>
<module>skywalking-commons</module>
<module>skywalking-alarm</module>
<module>skywalking-webui</module>
<module>skywalking-sniffer</module>
<module>skywalking-storage-center</module>
<module>samples/skywalking-auth</module>
<module>samples/skywalking-example</module>
<module>skywalking-commons</module>
</modules>
<packaging>pom</packaging>
......@@ -23,7 +23,6 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.version>2.0-2016</project.version>
</properties>
<dependencies>
......
......@@ -21,6 +21,18 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.version>2.0-2016</project.version>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -17,7 +17,7 @@
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>2.0-2016</version>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
package com.a.eye.skywalking.health.report;
import com.a.eye.skywalking.health.report.util.MachineUtil;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
......@@ -10,15 +9,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HealthCollector extends Thread {
private static ILog logger =
LogManager.getLogger(HealthCollector.class);
private static Map<String, HeathReading> heathReadings =
new ConcurrentHashMap<String, HeathReading>();
private static ILog logger = LogManager.getLogger(HealthCollector.class);
private static Map<String, HeathReading> heathReadings = new ConcurrentHashMap<String, HeathReading>();
private static final long DEFAULT_REPORT_INTERVAL = 60 * 1000;
private final long reportInterval;
private final long reportInterval;
private String reporterName;
private HealthCollector() {
private HealthCollector(String reporterName) {
this(DEFAULT_REPORT_INTERVAL);
this.reporterName = reporterName;
}
private HealthCollector(long reportInterval) {
......@@ -27,8 +26,8 @@ public class HealthCollector extends Thread {
this.reportInterval = reportInterval;
}
public static void init() {
new HealthCollector().start();
public static void init(String reporterName) {
new HealthCollector(reporterName).start();
}
public static HeathReading getCurrentHeathReading(String extraId) {
......@@ -37,8 +36,7 @@ public class HealthCollector extends Thread {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if (heathReadings.keySet().size() > 5000) {
throw new RuntimeException(
"use HealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
throw new RuntimeException("use HealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
}
heathReadings.put(id, new HeathReading(id));
}
......@@ -48,8 +46,7 @@ public class HealthCollector extends Thread {
}
private static String getId(String extraId) {
return "SkyWalking Health Report,P:" + MachineUtil.getProcessNo() + ",T:" + Thread.currentThread().getName()
+ "(" + Thread.currentThread().getId() + ")" + (extraId == null ? "" : ",extra:" + extraId);
return "T:" + Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")" + (extraId == null ? "" : ",extra:" + extraId);
}
@Override
......@@ -61,7 +58,7 @@ public class HealthCollector extends Thread {
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------Server Health Collector Report---------\n");
log.append("\n---------" + reporterName + " Health Report---------\n");
for (String key : keyList) {
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
......
package com.a.eye.skywalking.health.report.util;
import java.lang.management.ManagementFactory;
/**
* Created by xin on 2016/11/14.
*/
public class MachineUtil {
private static String processNo;
static {
processNo = getProcessNo();
}
public static String getProcessNo() {
if (processNo == null) {
String name = ManagementFactory.getRuntimeMXBean().getName();
processNo = name.split("@")[0];
}
return processNo;
}
private MachineUtil() {
// Non
}
}
......@@ -18,20 +18,20 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>2.0-2016</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-impl-log4j2</artifactId>
<version>2.0-2016</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
</dependencies>
</project>
......@@ -24,11 +24,15 @@
<artifactId>skywalking-logging</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-network</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>data-carrier</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
</project>
......@@ -17,17 +17,17 @@
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-network</artifactId>
<artifactId>skywalking-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-registry</artifactId>
<version>2.0-2016</version>
<artifactId>skywalking-logging-impl-log4j2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-impl-log4j2</artifactId>
<artifactId>skywalking-health-report</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -35,27 +35,12 @@
<artifactId>hsqldb</artifactId>
<version>2.3.4</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>data-carrier</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-health-report</artifactId>
<version>2.0-2016</version>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -29,7 +29,8 @@ import static com.a.eye.skywalking.storage.config.Config.RegistryCenter.PATH_PRE
*/
public class Main {
private static ILog logger = LogManager.getLogger(Main.class);
private static final ILog logger = LogManager.getLogger(Main.class);
private static final String SERVER_REPORTER_NAME = "Storage Server";
static {
LogManager.setLogResolver(new Log4j2Resolver());
......@@ -41,7 +42,7 @@ public class Main {
try {
initializeParam();
HealthCollector.init();
HealthCollector.init(SERVER_REPORTER_NAME);
DataFilesManager.init();
......@@ -54,15 +55,15 @@ public class Main {
provider.start();
if (logger.isDebugEnable()) {
logger.debug("Service provider started successfully.");
logger.debug("Service provider started.");
}
registryNode();
logger.info("Storage service started successfully.");
logger.info("SkyWalking storage server started.");
Thread.currentThread().join();
} catch (Throwable e) {
logger.error("Failed to start service.", e);
logger.error("SkyWalking storage server start failure.", e);
} finally {
provider.stop();
IndexDataCapacityMonitor.stop();
......@@ -88,10 +89,10 @@ public class Main {
printStorageConfig(properties);
ConfigInitializer.initialize(properties, Config.class);
} catch (IllegalAccessException e) {
logger.error("Initialize the collect server configuration failed", e);
logger.error("Initialize server configuration failure.", e);
throw e;
} catch (IOException e) {
logger.error("Initialize the collect server configuration failed", e);
logger.error("Initialize server configuration failure.", e);
throw e;
}
}
......
......@@ -53,4 +53,10 @@ public class SpanDataConsumer implements IConsumer<SpanData> {
HealthCollector.getCurrentHeathReading("SpanDataConsumer").updateData(HeathReading.ERROR,
"Failed to consume span data. error message : " + throwable.getMessage());
}
@Override
public void onExit() {
cache.close();
fileWriter.close();
}
}
......@@ -5,6 +5,7 @@ import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.config.Constants;
import com.a.eye.skywalking.storage.data.exception.ConnectionNotFoundException;
import com.a.eye.skywalking.storage.data.file.DataFileReader;
import com.a.eye.skywalking.storage.data.index.*;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
......@@ -13,15 +14,16 @@ import com.zaxxer.hikari.HikariDataSource;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_PASSWORD;
import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_USER;
import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath;
public class SpanDataFinder {
private static ILog logger = LogManager.getLogger(SpanDataFinder.class);
private static IndexDataSourceCache datasourceCache =
new IndexDataSourceCache(Config.Finder.CACHED_SIZE);
private static ILog logger = LogManager.getLogger(SpanDataFinder.class);
private static IndexDataSourceCache datasourceCache = new IndexDataSourceCache(Config.Finder.CACHED_SIZE);
private static ReentrantLock createDatasourceLock = new ReentrantLock();
public static List<SpanData> find(String traceId) {
long blockIndex = BlockIndexEngine.newFinder().find(fetchStartTimeFromTraceId(traceId));
......@@ -29,22 +31,40 @@ public class SpanDataFinder {
return new ArrayList<SpanData>();
}
IndexDBConnector indexDBConnector = fetchIndexDBConnector(blockIndex);
IndexMetaCollection indexMetaCollection = indexDBConnector.queryByTraceId(traceId);
indexDBConnector.close();
IndexDBConnector indexDBConnector = null;
IndexMetaCollection indexMetaCollection = null;
try {
indexDBConnector = fetchIndexDBConnector(blockIndex);
indexMetaCollection = indexDBConnector.queryByTraceId(traceId);
} finally {
if (indexDBConnector != null) {
indexDBConnector.close();
}
}
Iterator<IndexMetaGroup<String>> iterator =
IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder<String>() {
@Override
public String buildKey(IndexMetaInfo metaInfo) {
return metaInfo.getFileName();
}
}).iterator();
if (indexMetaCollection == null) {
return new ArrayList<SpanData>();
}
Iterator<IndexMetaGroup<String>> iterator = IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder<String>() {
@Override
public String buildKey(IndexMetaInfo metaInfo) {
return metaInfo.getFileName();
}
}).iterator();
List<SpanData> result = new ArrayList<SpanData>();
while (iterator.hasNext()) {
IndexMetaGroup<String> group = iterator.next();
result.addAll(new DataFileReader(group.getKey()).read(group.getMetaInfo()));
DataFileReader reader = null;
try {
reader = new DataFileReader(group.getKey());
result.addAll(reader.read(group.getMetaInfo()));
} finally {
if (reader != null) {
reader.close();
}
}
}
return result;
......@@ -52,12 +72,11 @@ public class SpanDataFinder {
private static IndexDBConnector fetchIndexDBConnector(long blockIndex) {
HikariDataSource datasource = getOrCreate(blockIndex);
IndexDBConnector indexDBConnector = null;
IndexDBConnector indexDBConnector;
try {
indexDBConnector = new IndexDBConnector(datasource.getConnection());
} catch (SQLException e) {
logger.warn("Failed to get connection from datasource,", e);
indexDBConnector = new IndexDBConnector(blockIndex);
throw new ConnectionNotFoundException("get connection failure.", e);
}
return indexDBConnector;
}
......@@ -65,17 +84,23 @@ public class SpanDataFinder {
private static HikariDataSource getOrCreate(long blockIndex) {
HikariDataSource datasource = datasourceCache.get(blockIndex);
if (datasource == null) {
HikariConfig dataSourceConfig = generateDatasourceConfig(blockIndex);
datasource = new HikariDataSource(dataSourceConfig);
datasourceCache.put(blockIndex, datasource);
createDatasourceLock.lock();
try {
if (datasource == null) {
HikariConfig dataSourceConfig = generateDatasourceConfig(blockIndex);
datasource = new HikariDataSource(dataSourceConfig);
datasourceCache.put(blockIndex, datasource);
}
} finally {
createDatasourceLock.unlock();
}
}
return datasource;
}
private static HikariConfig generateDatasourceConfig(long blockIndex) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH),
Config.DataIndex.FILE_NAME).generate(blockIndex));
config.setJdbcUrl(new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH), Config.DataIndex.FILE_NAME).generate(blockIndex));
config.setDriverClassName(Constants.DRIVER_CLASS_NAME);
config.setUsername(DEFAULT_USER);
config.setPassword(DEFAULT_PASSWORD);
......
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/5.
*/
public class ConnectionNotFoundException extends RuntimeException {
public ConnectionNotFoundException(String message, Exception e) {
super(message, e);
}
}
package com.a.eye.skywalking.storage.data.exception;
public class IndexMetaPersistenceFailedException extends RuntimeException {
public IndexMetaPersistenceFailedException(String message, Exception e){
public class IndexMetaStoredFailedException extends RuntimeException {
public IndexMetaStoredFailedException(String message, Exception e){
super(message, e);
}
}
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/4.
*/
public class SpanDataPersistenceFailedException extends RuntimeException {
public SpanDataPersistenceFailedException(Exception e) {
public class SpanDataStoredFailedException extends RuntimeException {
public SpanDataStoredFailedException(Exception e) {
}
}
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.exception.DataFileOperatorCreateFailedException;
import com.a.eye.skywalking.storage.data.exception.SpanDataPersistenceFailedException;
import com.a.eye.skywalking.storage.data.exception.SpanDataStoredFailedException;
import com.a.eye.skywalking.storage.data.exception.SpanDataReadFailedException;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
......@@ -84,7 +84,7 @@ public class DataFile {
currentOffset += bytes.length;
return metaInfo;
} catch (IOException e) {
throw new SpanDataPersistenceFailedException(e);
throw new SpanDataStoredFailedException(e);
}
}
......@@ -92,10 +92,14 @@ public class DataFile {
try {
operator.getWriter().flush();
} catch (IOException e) {
throw new SpanDataPersistenceFailedException(e);
throw new SpanDataStoredFailedException(e);
}
}
public void close(){
operator.close();
}
public byte[] read(long offset, int length) {
byte[] data = new byte[length];
try {
......@@ -103,8 +107,7 @@ public class DataFile {
operator.getReader().read(data, 0, length);
return data;
} catch (IOException e) {
throw new SpanDataReadFailedException(
"Failed to read dataFile[" + fileName + "], offset: " + offset + " " + "lenght: " + length, e);
throw new SpanDataReadFailedException("Failed to read dataFile[" + fileName + "], offset: " + offset + " " + "lenght: " + length, e);
}
}
......@@ -136,6 +139,23 @@ public class DataFile {
return reader;
}
public void close() {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
}
}
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
}
}
}
}
private File getDataFile() {
......
......@@ -47,4 +47,8 @@ public class DataFileReader {
return metaData;
}
public void close(){
dataFile.close();
}
}
......@@ -15,6 +15,7 @@ public class DataFileWriter {
public IndexMetaCollection write(List<SpanData> spanData) {
if (dataFile.overLimitLength()) {
this.close();
dataFile = DataFilesManager.createNewDataFile();
}
......@@ -26,4 +27,8 @@ public class DataFileWriter {
return collections;
}
public void close(){
dataFile.close();
}
}
......@@ -22,6 +22,10 @@ public class IndexDBConnectorCache {
return connector;
}
public void close(){
cachedOperators.close();
}
private void updateCache(long timestamp, IndexDBConnector operator) {
cachedOperators.put(timestamp, operator);
}
......@@ -43,5 +47,11 @@ public class IndexDBConnectorCache {
public LRUCache(int cacheSize) {
super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
}
public void close(){
for(Map.Entry<Long, IndexDBConnector> entry : this.entrySet()){
entry.getValue().close();
}
}
}
}
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.data.IndexDataCapacityMonitor;
import com.a.eye.skywalking.storage.data.exception.IndexMetaPersistenceFailedException;
import java.util.ArrayList;
import java.util.List;
import com.a.eye.skywalking.storage.data.exception.IndexMetaStoredFailedException;
public class IndexOperator {
......@@ -21,7 +18,7 @@ public class IndexOperator {
connector.batchUpdate(metaGroup);
IndexDataCapacityMonitor.addIndexData(timestamp, metaGroup.size());
} catch (Exception e) {
throw new IndexMetaPersistenceFailedException("Failed to batch save index meta", e);
throw new IndexMetaStoredFailedException("Failed to batch save index meta", e);
}
}
......
......@@ -24,12 +24,12 @@ public class SearchListener implements TraceSearchListener {
SpanDataHelper helper = new SpanDataHelper(data);
List<Span> span = helper.category().mergeData();
HealthCollector.getCurrentHeathReading("SearchListener")
.updateData(HeathReading.INFO, span.size() + " spans was founded by trace Id [" + traceId + "].");
.updateData(HeathReading.INFO, span.size() + " spans was found by trace Id [" + traceId + "].");
return span;
} catch (Exception e) {
logger.error("Failed to search trace Id [{}]", traceId, e);
logger.error("Search trace Id[{}] failure.", traceId, e);
HealthCollector.getCurrentHeathReading("SearchListener")
.updateData(HeathReading.ERROR, "Failed to search trace Id" + traceId + ".");
.updateData(HeathReading.ERROR, "Search trace Id[" + traceId + "] failure.");
return new ArrayList<Span>();
}
}
......
......@@ -28,13 +28,11 @@ public class StorageListener implements SpanStorageListener {
public boolean storage(RequestSpan requestSpan) {
try {
spanDataDataCarrier.produce(SpanDataBuilder.build(requestSpan));
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO,"Request span "
+ "consume successfully");
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO,"RequestSpan stored.");
return true;
} catch (Exception e) {
logger.error("Failed to storage request span. Span Data:\n {}.", requestSpan.toByteString(), e);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR,"Request span "
+ "consume failed");
logger.error("RequestSpan trace-id[{}] store failure..", requestSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR,"RequestSpan store failure.");
return false;
}
}
......@@ -43,9 +41,11 @@ public class StorageListener implements SpanStorageListener {
public boolean storage(AckSpan ackSpan) {
try {
spanDataDataCarrier.produce(SpanDataBuilder.build(ackSpan));
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO,"AckSpan stored.");
return true;
} catch (Exception e) {
logger.error("Failed to storage ack span. ack Data:\n {}.", ackSpan.toByteString(), e);
logger.error("AckSpan trace-id[{}] store failure..", ackSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR,"AckSpan store failure.");
return false;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册