提交 9f373fc2 编写于 作者: A ascrutae

修改数据索引实现方式

上级 38cc8d03
......@@ -41,6 +41,16 @@
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -9,10 +9,8 @@ import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.config.ConfigInitializer;
import com.a.eye.skywalking.storage.data.IndexDataCapacityMonitor;
import com.a.eye.skywalking.storage.data.file.DataFilesManager;
import com.a.eye.skywalking.storage.listener.SearchListener;
import com.a.eye.skywalking.storage.listener.StorageListener;
......@@ -46,9 +44,6 @@ public class Main {
DataFilesManager.init();
BlockIndexEngine.start();
IndexDataCapacityMonitor.start();
provider = ServiceProvider.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener())
.addAsyncTraceSearchService(new SearchListener()).build();
......@@ -67,7 +62,6 @@ public class Main {
logger.error("SkyWalking storage server start failure.", e);
} finally {
provider.stop();
IndexDataCapacityMonitor.stop();
}
}
......
package com.a.eye.skywalking.storage.block.index;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
/**
* Created by xin on 2016/11/2.
*/
public class BlockFinder {
private static ILog logger = LogManager.getLogger(BlockFinder.class);
private L1Cache l1Cache;
private L2Cache l2Cache;
public BlockFinder(L1Cache l1Cache, L2Cache l2Cache) {
this.l1Cache = l1Cache;
this.l2Cache = l2Cache;
}
public long find(long timestamp) {
Long index = l1Cache.find(timestamp);
if (index == null) {
index = l2Cache.find(timestamp);
}
return index;
}
public long findLastBlockIndex() {
return l2Cache.getLastBlockIndex();
}
}
package com.a.eye.skywalking.storage.block.index;
public class BlockIndexEngine {
private static L1Cache l1Cache;
private static L2Cache l2Cache;
public static void start(){
l1Cache = new L1Cache();
l2Cache = new L2Cache();
newUpdator().init();
}
public static BlockFinder newFinder() {
return new BlockFinder(l1Cache, l2Cache);
}
public static BlockIndexUpdator newUpdator() {
return new BlockIndexUpdator(l1Cache, l2Cache);
}
}
package com.a.eye.skywalking.storage.block.index;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.block.index.exception.BlockIndexPersistenceFailedException;
import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static com.a.eye.skywalking.storage.config.Config.BlockIndex.FILE_NAME;
import static com.a.eye.skywalking.storage.config.Config.BlockIndex.PATH;
import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath;
public class BlockIndexUpdator {
private static ILog logger = LogManager.getLogger(BlockIndexUpdator.class);
private L1Cache l1Cache;
private L2Cache l2Cache;
public BlockIndexUpdator(L1Cache l1Cache, L2Cache l2Cache) {
this.l1Cache = l1Cache;
this.l2Cache = l2Cache;
}
public void addRecord(long timestamp) {
logger.info("Updating block index. index key:{}", timestamp);
try {
updateFile(timestamp);
updateCache(timestamp);
} catch (Exception e) {
logger.error("Failed to add block index record", e);
}
}
private void updateCache(long timestamp) {
l1Cache.add2Rebuild(timestamp);
l2Cache.add2Rebuild(timestamp);
}
private void updateFile(long timestamp) throws BlockIndexPersistenceFailedException {
BufferedWriter writer = null;
try {
File blockIndexFile = getOrCreateBlockIndexFile();
writer = new BufferedWriter(new FileWriter(blockIndexFile));
writer.write(String.valueOf(timestamp));
writer.newLine();
writer.close();
} catch (IOException e) {
throw new BlockIndexPersistenceFailedException("Failed to save index[" + timestamp + "]", e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close index file", e);
}
}
}
}
void init() {
List<Long> indexData = new ArrayList<>();
BufferedReader indexFileReader = null;
try {
File blockIndexFile = getOrCreateBlockIndexFile();
indexFileReader = new BufferedReader(new FileReader(blockIndexFile));
String indexDataStr = null;
while ((indexDataStr = indexFileReader.readLine()) != null) {
indexData.add(Long.parseLong(indexDataStr));
}
} catch (IOException e) {
logger.error("Failed to read index data.", e);
} finally {
if (indexFileReader != null) {
try {
indexFileReader.close();
} catch (IOException e) {
logger.error("Failed to close index file", e);
}
}
}
if (indexData.size() == 0) {
if (logger.isDebugEnable()) {
logger.debug("Any block index was not founded. will add new block index.", indexData.size());
}
//如果此前没有记录,则取之前五分钟到目前的数据
addRecord(System.currentTimeMillis() - 5 * 60 * 1000);
return;
}
if (logger.isDebugEnable()) {
logger.debug("There are {} block index was founded. Begin to init L1Cache and L2Cache", indexData.size());
}
Collections.reverse(indexData);
l1Cache.init(indexData);
l2Cache.init(indexData);
}
public File getOrCreateBlockIndexFile() throws IOException {
if (logger.isDebugEnable()) {
}
File blockIndexFile = new File(getAbsolutePath(PATH), FILE_NAME);
if (!blockIndexFile.getParentFile().exists()) {
blockIndexFile.getParentFile().mkdirs();
}
if (!blockIndexFile.exists()) {
blockIndexFile.createNewFile();
}
return blockIndexFile;
}
}
package com.a.eye.skywalking.storage.block.index;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.a.eye.skywalking.storage.config.Config.BlockIndexEngine.L1_CACHE_SIZE;
/**
* 块索引的一级缓存
* <p>
* Created by xin on 2016/11/2.
*/
public class L1Cache {
private static ILog logger = LogManager.getLogger(L1Cache.class);
private TreeSet<Long> cacheData = new TreeSet<Long>();
private final ReadWriteLock updateLock = new ReentrantReadWriteLock();
void init(List<Long> data) {
StringBuilder initData = new StringBuilder();
int size = data.size() > L1_CACHE_SIZE ? L1_CACHE_SIZE : data.size();
for (int i = 0; i < size; i++) {
this.cacheData.add(data.get(i));
initData.append(data.get(i)).append(",");
}
if (logger.isDebugEnable()) {
logger.info("L1 cache init data : [{}]", initData.deleteCharAt(initData.length() - 1));
}
}
public Long find(long timestamp) {
Lock lock = updateLock.readLock();
try {
lock.lock();
return cacheData.higher(timestamp);
} finally {
lock.unlock();
}
}
public void add2Rebuild(long timestamp) {
TreeSet<Long> newCacheData = new TreeSet<>(cacheData);
newCacheData.add(timestamp);
if (newCacheData.size() >= L1_CACHE_SIZE + 1) {
long removedData = newCacheData.pollFirst();
logger.info("Add cache data : {}, removed cache Data:{}", timestamp, removedData);
}
Lock lock = updateLock.writeLock();
try {
lock.lock();
cacheData = newCacheData;
} finally {
lock.unlock();
}
}
}
package com.a.eye.skywalking.storage.block.index;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 块索引的二级缓存
*/
public class L2Cache {
private ILog logger = LogManager.getLogger(L2Cache.class);
private TreeSet<Long> cacheData = new TreeSet<Long>();
private ReadWriteLock updateLock = new ReentrantReadWriteLock();
void init(List<Long> data) {
this.cacheData.addAll(data);
if (logger.isDebugEnable()) {
logger.info("L2 cache init data : {}", data);
}
}
public Long find(long timestamp) {
Lock lock = updateLock.readLock();
try {
lock.lock();
return this.cacheData.higher(timestamp);
} finally {
lock.unlock();
}
}
public void add2Rebuild(long timestamp) {
TreeSet<Long> newCacheData = new TreeSet<>(cacheData);
newCacheData.add(timestamp);
Lock lock = updateLock.writeLock();
try {
lock.lock();
cacheData.add(timestamp);
} finally {
lock.unlock();
}
}
public long getLastBlockIndex() {
return cacheData.last();
}
}
package com.a.eye.skywalking.storage.block.index.exception;
/**
* Created by xin on 2016/11/2.
*/
public class BlockIndexPersistenceFailedException extends Exception {
public BlockIndexPersistenceFailedException(String message, Exception e) {
super(message, e);
}
}
......@@ -8,6 +8,7 @@ public class Config {
public static int PORT = 34000;
}
public static class DataConsumer {
public static int CHANNEL_SIZE = 10;
......@@ -18,13 +19,6 @@ public class Config {
}
public static class BlockIndex {
public static String PATH = "/block_index";
public static String FILE_NAME = "data_file.index";
}
public static class DataFile {
public static String PATH = "/data/file";
......@@ -34,21 +28,19 @@ public class Config {
public static class DataIndex {
public static String PATH = "/data/index";
public static final int INDEX_LISTEN_PORT = 9300;
public static String FILE_NAME = "dataIndex";
}
public static long SIZE = 1000 * 1000 * 1000;
public static class IndexOperator {
public static class Operator {
public static int CACHE_SIZE = 5;
}
}
public static class Finder {
public static int TOTAL = 50;
public static class BlockIndexEngine {
public static int L1_CACHE_SIZE = 10;
public static int IDEL = 20;
}
}
......@@ -63,14 +55,4 @@ public class Config {
public static String PATH_PREFIX = "/skywalking/storage_list/";
}
public static class Finder {
public static int CACHED_SIZE = 10;
public static class DataSource {
public static int MAX_POOL_SIZE = 20;
public static int MIN_IDLE = 5;
}
}
}
......@@ -8,19 +8,19 @@ public class Constants {
public static int MAX_BATCH_SIZE = 50;
public static class SQL {
public static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "\n" + "(\n"
+ " id INT PRIMARY KEY NOT NULL IDENTITY,\n"
+ " tid_s0 INT NOT NULL,\n"
+ " tid_s1 BIGINT NOT NULL,\n"
+ " tid_s2 INT NOT NULL,\n"
+ " tid_s3 INT NOT NULL,\n"
+ " tid_s4 INT NOT NULL,\n"
+ " tid_s5 INT NOT NULL,\n"
+ " span_type INT NOT NULL, \n"
+ " file_name BIGINT NOT NULL,\n"
+ " file_name_suffix INT NOT NULL,\n"
+ " offset BIGINT NOT NULL,\n"
+ " length INT NOT NULL\n" + ");\n";
public static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "( "
+ " id INT PRIMARY KEY NOT NULL IDENTITY, "
+ " tid_s0 INT NOT NULL, "
+ " tid_s1 BIGINT NOT NULL, "
+ " tid_s2 INT NOT NULL, "
+ " tid_s3 INT NOT NULL, "
+ " tid_s4 INT NOT NULL, "
+ " tid_s5 INT NOT NULL, "
+ " span_type INT NOT NULL, "
+ " file_name BIGINT NOT NULL, "
+ " file_name_suffix INT NOT NULL, "
+ " offset BIGINT NOT NULL, "
+ " length INT NOT NULL " + "); ";
public static final String CREATE_INDEX = "CREATE INDEX \"index_data_trace_id_index\" ON " + TABLE_NAME + " "
+ "(tid_s0,tid_s1,tid_s2,tid_s3,tid_s4,tid_s5);";
......
package com.a.eye.skywalking.storage.data;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.data.index.IndexDBConnector;
import java.sql.SQLException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.a.eye.skywalking.storage.config.Config.DataIndex.SIZE;
/**
* Created by xin on 2016/11/6.
*/
public class IndexDataCapacityMonitor {
private static ILog logger = LogManager.getLogger(IndexDataCapacityMonitor.class);
private static Detector detector;
public static void addIndexData(long timestamp, int size) {
if (detector.isDetectFor(timestamp)) {
detector.add(size);
}
}
private static class Detector extends TimerTask {
private AtomicLong currentSize;
private long timestamp;
private Timer timer = new Timer();
public Detector(long timestamp) {
this.timestamp = timestamp;
currentSize = new AtomicLong();
start();
}
public Detector(long timestamp, long currentSize) {
this.currentSize = new AtomicLong(currentSize);
this.timestamp = timestamp;
start();
}
public void start() {
timer.schedule(this, 0, TimeUnit.SECONDS.toMillis(30));
}
public void stop() {
timer.cancel();
}
public boolean isDetectFor(long timestamp) {
return this.timestamp == timestamp;
}
public void add(int updateRecordSize) {
currentSize.addAndGet(updateRecordSize);
}
@Override
public void run() {
if (currentSize.get() > SIZE * 0.8) {
stop();
notificationAddNewBlockIndexAndCreateNewIndexDB();
HealthCollector.getCurrentHeathReading("Index Data Capacity Detector").updateData(HeathReading.INFO,
"Detector is detecting the index [%d]. and the capacity of {} is %d", timestamp,
currentSize.get());
}
}
}
private static void notificationAddNewBlockIndexAndCreateNewIndexDB() {
long timestamp = System.currentTimeMillis() + 5 * 60 * 1000;
BlockIndexEngine.newUpdator().addRecord(timestamp);
logger.info("Create a new Index DB [{}]", timestamp);
createNewIndexDB(timestamp);
detector = new Detector(timestamp);
}
private static void createNewIndexDB(long timestamp) {
IndexDBConnector connector = new IndexDBConnector(timestamp);
connector.close();
}
public static void start() {
long timestamp = BlockIndexEngine.newFinder().findLastBlockIndex();
IndexDBConnector dbConnector = null;
try {
dbConnector = new IndexDBConnector(timestamp);
long count = 0;
try {
count = dbConnector.fetchIndexSize();
} catch (SQLException e) {
logger.error("Failed to to fetch index size from DB:{}", timestamp, e);
}
detector = new Detector(timestamp, count);
logger.info("Index data capacity monitor started successfully!");
} finally {
if (dbConnector != null) {
dbConnector.close();
}
}
}
public static void stop() {
detector.stop();
}
}
......@@ -5,48 +5,34 @@ import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.*;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.operator.IndexOperator;
import com.a.eye.skywalking.storage.data.index.operator.OperatorFactory;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import java.util.Iterator;
import java.util.List;
public class SpanDataConsumer implements IConsumer<SpanData> {
private static ILog logger = LogManager.getLogger(SpanDataConsumer.class);
private IndexDBConnectorCache cache;
private DataFileWriter fileWriter;
@Override
public void init() {
cache = new IndexDBConnectorCache();
fileWriter = new DataFileWriter();
}
@Override
public void consume(List<SpanData> data) {
Iterator<IndexMetaGroup<Long>> iterator =
IndexMetaCollections.group(fileWriter.write(data), new GroupKeyBuilder<Long>() {
@Override
public Long buildKey(IndexMetaInfo metaInfo) {
return BlockIndexEngine.newFinder().find(metaInfo.getTraceStartTime());
}
}).iterator();
IndexMetaCollection collection = fileWriter.write(data);
IndexOperator operator = OperatorFactory.createIndexOperator();
operator.batchUpdate(collection);
while (iterator.hasNext()) {
IndexMetaGroup<Long> metaGroup = iterator.next();
IndexOperator indexOperator = IndexOperator.newOperator(getDBConnector(metaGroup));
indexOperator.batchUpdate(metaGroup);
HealthCollector.getCurrentHeathReading("SpanDataConsumer")
.updateData(HeathReading.INFO, "%s messages were successful consumed .", data.size());
}
}
private IndexDBConnector getDBConnector(IndexMetaGroup<Long> metaGroup) {
return cache.get(metaGroup.getKey());
}
@Override
public void onError(List<SpanData> span, Throwable throwable) {
......@@ -57,7 +43,6 @@ public class SpanDataConsumer implements IConsumer<SpanData> {
@Override
public void onExit() {
cache.close();
fileWriter.close();
}
}
package com.a.eye.skywalking.storage.data;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.config.Constants;
import com.a.eye.skywalking.storage.data.exception.ConnectionNotFoundException;
import com.a.eye.skywalking.storage.data.file.DataFileReader;
import com.a.eye.skywalking.storage.data.index.*;
import com.a.eye.skywalking.storage.data.index.operator.FinderExecutor;
import com.a.eye.skywalking.storage.data.index.operator.IndexOperateExecutor;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_PASSWORD;
import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_USER;
import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class SpanDataFinder {
private static IndexDataSourceCache datasourceCache = new IndexDataSourceCache(Config.Finder.CACHED_SIZE);
private static ReentrantLock createDatasourceLock = new ReentrantLock();
public static List<SpanData> find(TraceId traceId) {
long blockIndex = BlockIndexEngine.newFinder().find(traceId.getSegments(1));
if (blockIndex == 0) {
return new ArrayList<SpanData>();
}
IndexDBConnector indexDBConnector = null;
IndexMetaCollection indexMetaCollection = null;
try {
indexDBConnector = fetchIndexDBConnector(blockIndex);
indexMetaCollection = indexDBConnector.queryByTraceId(traceId.getSegmentsList().toArray(new Long[traceId
.getSegmentsCount()]));
} finally {
if (indexDBConnector != null) {
indexDBConnector.close();
}
}
IndexMetaCollection indexMetaCollection = IndexOperateExecutor.execute(new FinderExecutor<IndexMetaCollection>(
traceId.getSegmentsList().toArray(new Long[traceId.getSegmentsCount()])));
if (indexMetaCollection == null) {
return new ArrayList<SpanData>();
}
Iterator<IndexMetaGroup<String>> iterator = IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder<String>() {
Iterator<IndexMetaGroup<String>> iterator =
IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder<String>() {
@Override
public String buildKey(IndexMetaInfo metaInfo) {
return metaInfo.getFileName().fileName();
......@@ -70,71 +45,4 @@ public class SpanDataFinder {
return result;
}
private static long[] spiltTraceId(String traceId){
return null;
}
private static IndexDBConnector fetchIndexDBConnector(long blockIndex) {
HikariDataSource datasource = getOrCreate(blockIndex);
IndexDBConnector indexDBConnector;
try {
indexDBConnector = new IndexDBConnector(datasource.getConnection());
} catch (SQLException e) {
throw new ConnectionNotFoundException("get connection failure.", e);
}
return indexDBConnector;
}
private static HikariDataSource getOrCreate(long blockIndex) {
HikariDataSource datasource = datasourceCache.get(blockIndex);
if (datasource == null) {
createDatasourceLock.lock();
try {
if (datasource == null) {
HikariConfig dataSourceConfig = generateDatasourceConfig(blockIndex);
datasource = new HikariDataSource(dataSourceConfig);
datasourceCache.put(blockIndex, datasource);
}
} finally {
createDatasourceLock.unlock();
}
}
return datasource;
}
private static HikariConfig generateDatasourceConfig(long blockIndex) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH), Config.DataIndex.FILE_NAME).generate(blockIndex));
config.setDriverClassName(Constants.DRIVER_CLASS_NAME);
config.setUsername(DEFAULT_USER);
config.setPassword(DEFAULT_PASSWORD);
config.setMaximumPoolSize(Config.Finder.DataSource.MAX_POOL_SIZE);
config.setMinimumIdle(Config.Finder.DataSource.MIN_IDLE);
return config;
}
private static long fetchStartTimeFromTraceId(String traceId) {
String[] traceIdSegment = traceId.split("\\.");
return Long.parseLong(traceIdSegment[traceIdSegment.length - 5]);
}
private static class IndexDataSourceCache extends LinkedHashMap<Long, HikariDataSource> {
private int cacheSize;
@Override
protected boolean removeEldestEntry(Map.Entry<Long, HikariDataSource> eldest) {
boolean removed = size() > cacheSize;
if (removed) {
eldest.getValue().close();
}
return removed;
}
public IndexDataSourceCache(int cacheSize) {
super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
this.cacheSize = cacheSize;
}
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/5.
*/
public class ConnectorInitializeFailedException extends RuntimeException {
public ConnectorInitializeFailedException(String message, Exception e) {
super(message, e);
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/5.
* Created by xin on 2016/11/20.
*/
public class ConnectionNotFoundException extends RuntimeException {
public ConnectionNotFoundException(String message, Exception e) {
public class IndexOperateFailedException extends RuntimeException {
public IndexOperateFailedException(String message, Exception e) {
super(message, e);
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/20.
*/
public class IndexOperatorInitializeFailedException extends RuntimeException {
public IndexOperatorInitializeFailedException(String message, Throwable cause) {
super(message, cause);
}
}
package com.a.eye.skywalking.storage.data.index;
/**
* Created by xin on 2016/11/13.
*/
public class ConnectURLGenerator {
private String basePath;
private String dbFileName;
public ConnectURLGenerator(String basePath, String dbFileName) {
this.basePath = basePath;
this.dbFileName = dbFileName;
}
public String generate(long timestamp) {
return "jdbc:hsqldb:file:" + basePath + "/" + timestamp + "/" + dbFileName;
}
}
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.config.Constants;
import com.a.eye.skywalking.storage.data.exception.ConnectorInitializeFailedException;
import com.a.eye.skywalking.storage.data.file.DataFileNameDesc;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanType;
import java.sql.*;
import static com.a.eye.skywalking.storage.config.Constants.SQL.*;
import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath;
/**
* Created by xin on 2016/11/4.
*/
public class IndexDBConnector {
private static ILog logger = LogManager.getLogger(IndexDBConnector.class);
static {
try {
Class.forName(Constants.DRIVER_CLASS_NAME);
} catch (ClassNotFoundException e) {
//never
}
}
private long timestamp;
private Connection connection;
private ConnectURLGenerator generator =
new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH), Config.DataIndex.FILE_NAME);
public IndexDBConnector(long timestamp) {
this.timestamp = timestamp;
createConnection();
createTableAndIndexIfNecessary();
}
public IndexDBConnector(Connection connection) {
this.connection = connection;
createTableAndIndexIfNecessary();
}
private void createTableAndIndexIfNecessary() {
try {
if (!tableExists()) {
createTable();
createIndex();
}
} catch (SQLException e) {
throw new ConnectorInitializeFailedException("Failed to create table and index.", e);
}
}
private void createConnection() {
try {
connection = DriverManager.getConnection(generator.generate(timestamp), DEFAULT_USER, DEFAULT_PASSWORD);
connection.setAutoCommit(true);
} catch (SQLException e) {
throw new ConnectorInitializeFailedException("Failed to create connection.", e);
}
}
private boolean tableExists() throws SQLException {
PreparedStatement ps = connection.prepareStatement(QUERY_TABLES);
ResultSet rs = ps.executeQuery();
rs.next();
boolean exists = rs.getInt("TABLE_COUNT") == 1;
rs.close();
ps.close();
return exists;
}
private void createTable() throws SQLException {
PreparedStatement ps = connection.prepareStatement(CREATE_TABLE);
ps.execute();
ps.close();
}
private void createIndex() throws SQLException {
PreparedStatement ps = connection.prepareStatement(CREATE_INDEX);
ps.execute();
ps.close();
}
public long getTimestamp() {
return timestamp;
}
public void batchUpdate(IndexMetaGroup<Long> metaGroup) throws SQLException {
int currentIndex = 0;
PreparedStatement ps = null;
try {
ps = connection.prepareStatement(INSERT_INDEX);
boolean isCommitted = false;
for (IndexMetaInfo metaInfo : metaGroup.getMetaInfo()) {
ps.setInt(1, metaInfo.getTraceId()[0].intValue());
ps.setLong(2, metaInfo.getTraceId()[1]);
ps.setInt(3, metaInfo.getTraceId()[2].intValue());
ps.setInt(4, metaInfo.getTraceId()[3].intValue());
ps.setInt(5, metaInfo.getTraceId()[4].intValue());
ps.setInt(6, metaInfo.getTraceId()[5].intValue());
ps.setInt(7, metaInfo.getSpanType().getValue());
ps.setLong(8, metaInfo.getFileName().getName());
ps.setInt(9, metaInfo.getFileName().getSuffix());
ps.setLong(10, metaInfo.getOffset());
ps.setInt(11, metaInfo.getLength());
ps.addBatch();
if (++currentIndex > Constants.MAX_BATCH_SIZE) {
ps.executeBatch();
isCommitted = true;
} else {
isCommitted = false;
}
}
if (!isCommitted) {
ps.executeBatch();
}
} finally {
if (ps != null)
ps.close();
}
}
public long fetchIndexSize() throws SQLException {
PreparedStatement ps = connection.prepareStatement(QUERY_INDEX_SIZE);
ResultSet rs = ps.executeQuery();
rs.next();
long indexSize = rs.getLong("INDEX_SIZE");
rs.close();
ps.close();
return indexSize;
}
public IndexMetaCollection queryByTraceId(Long[] traceId) {
try {
PreparedStatement ps = connection.prepareStatement(QUERY_TRACE_ID);
ps.setInt(1, traceId[0].intValue());
ps.setLong(2, traceId[1]);
ps.setInt(3, traceId[2].intValue());
ps.setInt(4, traceId[3].intValue());
ps.setInt(5, traceId[4].intValue());
ps.setInt(6, traceId[5].intValue());
ResultSet rs = ps.executeQuery();
IndexMetaCollection collection = new IndexMetaCollection();
while (rs.next()) {
SpanType spanType = SpanType.convert(rs.getInt("span_type"));
SpanData spanData = null;
if (SpanType.ACKSpan == spanType) {
spanData = new AckSpanData();
} else if (SpanType.RequestSpan == spanType) {
spanData = new RequestSpanData();
}
collection.add(new IndexMetaInfo(spanData,
new DataFileNameDesc(rs.getLong("file_name"), rs.getInt("file_name_suffix")),
rs.getLong("offset"), rs.getInt("length")));
}
return collection;
} catch (SQLException e) {
logger.error("Failed to query trace Id [{}]", traceId, e);
return new IndexMetaCollection();
}
}
public void close() {
try {
connection.close();
} catch (SQLException e) {
logger.error("Failed to close index db connector", e);
}
}
}
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.config.Config;
import java.util.LinkedHashMap;
import java.util.Map;
public class IndexDBConnectorCache {
private LRUCache cachedOperators;
public IndexDBConnectorCache() {
cachedOperators = new LRUCache(Config.DataIndex.Operator.CACHE_SIZE);
}
public IndexDBConnector get(long timestamp) {
IndexDBConnector connector = (IndexDBConnector) cachedOperators.get(timestamp);
if (connector == null) {
connector = new IndexDBConnector(timestamp);
updateCache(timestamp, connector);
}
return connector;
}
public void close(){
cachedOperators.close();
}
private void updateCache(long timestamp, IndexDBConnector operator) {
cachedOperators.put(timestamp, operator);
}
private void removedCache(IndexDBConnector connector) {
connector.close();
}
private class LRUCache extends LinkedHashMap<Long, IndexDBConnector> {
@Override
protected boolean removeEldestEntry(Map.Entry<Long, IndexDBConnector> eldest) {
boolean isNeedRemove = size() > Config.DataIndex.Operator.CACHE_SIZE;
if (isNeedRemove) {
removedCache(eldest.getValue());
}
return isNeedRemove;
}
public LRUCache(int cacheSize) {
super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
}
public void close(){
for(Map.Entry<Long, IndexDBConnector> entry : this.entrySet()){
entry.getValue().close();
}
}
}
}
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.block.index.BlockFinder;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
......
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.data.IndexDataCapacityMonitor;
import com.a.eye.skywalking.storage.data.exception.IndexMetaStoredFailedException;
public class IndexOperator {
private IndexDBConnector connector;
private long timestamp;
private IndexOperator(IndexDBConnector connector) {
this.connector = connector;
timestamp = connector.getTimestamp();
}
public void batchUpdate(IndexMetaGroup metaGroup) {
try {
connector.batchUpdate(metaGroup);
IndexDataCapacityMonitor.addIndexData(timestamp, metaGroup.size());
} catch (Exception e) {
throw new IndexMetaStoredFailedException("Failed to batch save index meta", e);
}
}
public static IndexOperator newOperator(IndexDBConnector indexDBConnector) {
return new IndexOperator(indexDBConnector);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
interface Executor<T> {
T execute(IndexOperator indexOperator);
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
/**
* Created by xin on 2016/11/20.
*/
public class FinderExecutor implements Executor<IndexMetaCollection> {
private Long[] traceIdSegment;
public FinderExecutor(Long[] traceIdSegment) {
this.traceIdSegment = traceIdSegment;
}
@Override
public com.a.eye.skywalking.storage.data.index.IndexMetaCollection execute(IndexOperator indexOperator) {
return indexOperator.findIndex(traceIdSegment);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.exception.IndexOperateFailedException;
import com.a.eye.skywalking.storage.data.index.operator.pool.IndexOperatorPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.transport.TransportClient;
public class IndexOperateExecutor {
private static IndexOperatorPool indexOperatorPool;
public static <T> T execute(Executor<T> executor) {
TransportClient client = null;
try {
client = indexOperatorPool.borrowObject();
return executor.execute(new IndexOperatorImpl(client));
} catch (Exception e) {
throw new IndexOperateFailedException("Index operate failed.", e);
} finally {
indexOperatorPool.returnObject(client);
}
}
static {
initializeIndexOperatorPool();
}
private static void initializeIndexOperatorPool() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(Config.IndexOperator.Finder.TOTAL);
poolConfig.setMaxIdle(Config.IndexOperator.Finder.IDEL);
poolConfig.setTestOnBorrow(true);
indexOperatorPool = new IndexOperatorPool(poolConfig);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
public interface IndexOperator {
void batchUpdate(IndexMetaCollection metaInfos);
IndexMetaCollection findIndex(Long[] traceId);
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class IndexOperatorImpl implements IndexOperator {
private static ILog logger = LogManager.getLogger(IndexOperatorImpl.class);
private TransportClient client;
public IndexOperatorImpl(TransportClient client) {
this.client = client;
}
@Override
public void batchUpdate(IndexMetaCollection metaInfos) {
BulkRequestBuilder requestBuilder = client.prepareBulk();
for (IndexMetaInfo indexMetaInfo : metaInfos) {
try {
requestBuilder.add(client.prepareIndex("skywalking", "index").setSource(buildSource(indexMetaInfo)));
} catch (Exception e) {
logger.error("Failed to update index.", e);
HealthCollector.getCurrentHeathReading("IndexOperator")
.updateData(HeathReading.ERROR, "Failed to " + "update index.");
}
}
BulkResponse bulkRequest = requestBuilder.get();
if (bulkRequest.hasFailures()) {
HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR,
"Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage());
}
}
private XContentBuilder buildSource(IndexMetaInfo indexMetaInfo) throws IOException {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("traceid_s0", indexMetaInfo.getTraceId()[0])
.field("traceid_s1", indexMetaInfo.getTraceId()[1]).field("traceid_s2", indexMetaInfo.getTraceId()[2])
.field("traceid_s3", indexMetaInfo.getTraceId()[3]).field("traceid_s4", indexMetaInfo.getTraceId()[4])
.field("traceid_s5", indexMetaInfo.getTraceId()[5]).field("fileName", indexMetaInfo.getFileName())
.field("offset", indexMetaInfo.getOffset()).field("length", indexMetaInfo.getLength()).endObject();
return xContentBuilder;
}
@Override
public IndexMetaCollection findIndex(Long[] traceId) {
return null;
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.exception.IndexOperatorInitializeFailedException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
public class OperatorFactory {
public static IndexOperator createIndexOperator() {
try {
return new IndexOperatorImpl(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT)));
} catch (Exception e) {
throw new IndexOperatorInitializeFailedException("Failed to initialze operator.", e);
}
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
/**
* Created by xin on 2016/11/20.
*/
public class UpdateExecutor implements Executor<Integer> {
private IndexMetaCollection metaCollection;
public UpdateExecutor(IndexMetaCollection metaCollection) {
this.metaCollection = metaCollection;
}
@Override
public IndexMetaCollection execute(IndexOperator indexOperator) {
return null;
}
}
package com.a.eye.skywalking.storage.data.index.operator.pool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.transport.TransportClient;
public class IndexOperatorPool extends GenericObjectPool<TransportClient> {
public IndexOperatorPool(GenericObjectPoolConfig poolConfig) {
super(new IndexOperatorPooledObjectFactory(), poolConfig);
}
}
package com.a.eye.skywalking.storage.data.index.operator.pool;
import com.a.eye.skywalking.storage.config.Config;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
public class IndexOperatorPooledObjectFactory extends BasePooledObjectFactory<TransportClient> {
@Override
public TransportClient create() throws Exception {
return new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT));
}
@Override
public PooledObject<TransportClient> wrap(org.elasticsearch.client.transport.TransportClient client) {
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<TransportClient> p) throws Exception {
super.destroyObject(p);
}
}
......@@ -10,29 +10,11 @@ server.port=34000
# the size of data consumer
#dataconsumer.consumer_size = 5
#
# the path that storage block index
#blockindex.path=/block-index
#
# the name of file which storage block index
#blockindex.file_name=data_file.index
#
# the path that storage data file
#datafile.path=/data/file
#
# the max size of data file (byte)
datafile.size=50000000
#
# the path that storage data index
#dataindex.path=/data/index
#
# the name of the file which storage data index
#dataindex.file_name=dataIndex
#
# the max size of data index
dataindex.size=1000000
#
# the cached size of data index operator
#dataindex.operator.cache_size=10
#datafile.size=50000000
#
# the auth info which registry center
#registrycenter.auth_info=
......@@ -57,3 +39,12 @@ registrycenter.connect_url=127.0.0.1:2181
#
# the min idle of data source that finder operate
#finder.datasource.min_idle=5
#
# the port which data index listening
DataIndex.INDEX_LISTEN_PORT=9300
#
# the total size of finder
#indexoperator.finder.total=50
#
# the max idel of finder
#indexoperator.finder.idel=20
......@@ -9,7 +9,7 @@ import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub;
public class StorageClient {
private static ManagedChannel channel =
ManagedChannelBuilder.forAddress("10.128.35.79", 34000).usePlaintext(true).build();
ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build();
private static SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageServiceStub = newStub(channel);
......@@ -21,20 +21,21 @@ public class StorageClient {
public static void main(String[] args) throws InterruptedException {
RequestSpan requestSpan =
RequestSpan.newBuilder().setSpanType(1).setAddress("127.0.0.1").setApplicationId("1").setCallType("1")
.setLevelId(0).setProcessNo("19287").setStartDate(System.currentTimeMillis()).setTraceId(
.setParentLevel("0.0.0").setLevelId(0).setProcessNo("19287")
.setStartDate(System.currentTimeMillis()).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1")
.setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
.setViewPointId("com.a.eye.skywalking.storage.block.index.BlockIndexEngine.newFinder").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
AckSpan ackSpan = AckSpan.newBuilder().setParentLevel("0.0.0").setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setStatusCode(0)
.setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
.setViewpointId("com.a.eye.skywalking.storage.block.index.BlockIndexEngine.newFinder").build();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
for (int i = 0; i < 10000; i++) {
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageServiceStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
......@@ -86,7 +87,7 @@ public class StorageClient {
requestSpanStreamObserver.onCompleted();
if(i % 500_000 == 0){
if (i % 500_000 == 0) {
System.out.println(i);
}
......
......@@ -36,7 +36,7 @@ public class SearchClient {
StreamObserver<QueryTask> searchResult = searchServiceStub.search(serverStreamObserver);
searchResult.onNext(QueryTask.newBuilder().setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828)
TraceId.newBuilder().addSegments(201611).addSegments(1479267274243L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setTaskId(1).build());
searchResult.onCompleted();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册