diff --git a/skywalking-storage-center/skywalking-storage/pom.xml b/skywalking-storage-center/skywalking-storage/pom.xml index 0b101ebcd2e0195e795acb0704e4f284c5ad58b3..0fd0aad040e02f6bb51066573f97a2f93709dc40 100644 --- a/skywalking-storage-center/skywalking-storage/pom.xml +++ b/skywalking-storage-center/skywalking-storage/pom.xml @@ -41,6 +41,16 @@ 2.4.3 + + org.apache.commons + commons-pool2 + 2.4.2 + + + org.elasticsearch.client + transport + 5.0.1 + diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java index 1fb8acfaca230ac0fa165ab0918eb05c2d5bdef3..0fe6b93214f9dadb4314e521f0f8a8fc63f9721b 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java @@ -9,10 +9,8 @@ import com.a.eye.skywalking.registry.RegistryCenterFactory; import com.a.eye.skywalking.registry.api.CenterType; import com.a.eye.skywalking.registry.api.RegistryCenter; import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig; -import com.a.eye.skywalking.storage.block.index.BlockIndexEngine; import com.a.eye.skywalking.storage.config.Config; import com.a.eye.skywalking.storage.config.ConfigInitializer; -import com.a.eye.skywalking.storage.data.IndexDataCapacityMonitor; import com.a.eye.skywalking.storage.data.file.DataFilesManager; import com.a.eye.skywalking.storage.listener.SearchListener; import com.a.eye.skywalking.storage.listener.StorageListener; @@ -46,9 +44,6 @@ public class Main { DataFilesManager.init(); - BlockIndexEngine.start(); - - IndexDataCapacityMonitor.start(); provider = ServiceProvider.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener()) .addAsyncTraceSearchService(new SearchListener()).build(); @@ -67,7 +62,6 @@ public class Main { logger.error("SkyWalking storage server start failure.", e); } finally { provider.stop(); - IndexDataCapacityMonitor.stop(); } } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockFinder.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockFinder.java deleted file mode 100644 index 165d6e0b6ec26ef42d34e0c8a7e0b79217330658..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockFinder.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.a.eye.skywalking.storage.block.index; - -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; - -/** - * Created by xin on 2016/11/2. - */ -public class BlockFinder { - - private static ILog logger = LogManager.getLogger(BlockFinder.class); - - private L1Cache l1Cache; - private L2Cache l2Cache; - - public BlockFinder(L1Cache l1Cache, L2Cache l2Cache) { - this.l1Cache = l1Cache; - this.l2Cache = l2Cache; - } - - public long find(long timestamp) { - Long index = l1Cache.find(timestamp); - if (index == null) { - index = l2Cache.find(timestamp); - } - - return index; - } - - - public long findLastBlockIndex() { - return l2Cache.getLastBlockIndex(); - } - -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexEngine.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexEngine.java deleted file mode 100644 index 386c45126462f04c008020ca309838a9a2e4d245..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexEngine.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.a.eye.skywalking.storage.block.index; - - -public class BlockIndexEngine { - - private static L1Cache l1Cache; - private static L2Cache l2Cache; - - public static void start(){ - l1Cache = new L1Cache(); - l2Cache = new L2Cache(); - newUpdator().init(); - } - - public static BlockFinder newFinder() { - return new BlockFinder(l1Cache, l2Cache); - } - - - public static BlockIndexUpdator newUpdator() { - return new BlockIndexUpdator(l1Cache, l2Cache); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexUpdator.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexUpdator.java deleted file mode 100644 index 753b60a503a741d91c6287f8df440c7ca799b810..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/BlockIndexUpdator.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.a.eye.skywalking.storage.block.index; - -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.storage.block.index.exception.BlockIndexPersistenceFailedException; - -import java.io.*; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static com.a.eye.skywalking.storage.config.Config.BlockIndex.FILE_NAME; -import static com.a.eye.skywalking.storage.config.Config.BlockIndex.PATH; -import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath; - -public class BlockIndexUpdator { - - private static ILog logger = LogManager.getLogger(BlockIndexUpdator.class); - private L1Cache l1Cache; - private L2Cache l2Cache; - - public BlockIndexUpdator(L1Cache l1Cache, L2Cache l2Cache) { - this.l1Cache = l1Cache; - this.l2Cache = l2Cache; - } - - public void addRecord(long timestamp) { - logger.info("Updating block index. index key:{}", timestamp); - try { - updateFile(timestamp); - updateCache(timestamp); - } catch (Exception e) { - logger.error("Failed to add block index record", e); - } - } - - private void updateCache(long timestamp) { - l1Cache.add2Rebuild(timestamp); - l2Cache.add2Rebuild(timestamp); - } - - - private void updateFile(long timestamp) throws BlockIndexPersistenceFailedException { - BufferedWriter writer = null; - try { - File blockIndexFile = getOrCreateBlockIndexFile(); - writer = new BufferedWriter(new FileWriter(blockIndexFile)); - writer.write(String.valueOf(timestamp)); - writer.newLine(); - writer.close(); - } catch (IOException e) { - throw new BlockIndexPersistenceFailedException("Failed to save index[" + timestamp + "]", e); - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - logger.error("Failed to close index file", e); - } - } - } - } - - - void init() { - List indexData = new ArrayList<>(); - BufferedReader indexFileReader = null; - try { - File blockIndexFile = getOrCreateBlockIndexFile(); - indexFileReader = new BufferedReader(new FileReader(blockIndexFile)); - String indexDataStr = null; - while ((indexDataStr = indexFileReader.readLine()) != null) { - indexData.add(Long.parseLong(indexDataStr)); - } - } catch (IOException e) { - logger.error("Failed to read index data.", e); - } finally { - if (indexFileReader != null) { - try { - indexFileReader.close(); - } catch (IOException e) { - logger.error("Failed to close index file", e); - } - } - } - - - if (indexData.size() == 0) { - if (logger.isDebugEnable()) { - logger.debug("Any block index was not founded. will add new block index.", indexData.size()); - } - //如果此前没有记录,则取之前五分钟到目前的数据 - addRecord(System.currentTimeMillis() - 5 * 60 * 1000); - return; - } - - if (logger.isDebugEnable()) { - logger.debug("There are {} block index was founded. Begin to init L1Cache and L2Cache", indexData.size()); - } - - Collections.reverse(indexData); - l1Cache.init(indexData); - l2Cache.init(indexData); - } - - public File getOrCreateBlockIndexFile() throws IOException { - if (logger.isDebugEnable()) { - - } - File blockIndexFile = new File(getAbsolutePath(PATH), FILE_NAME); - - if (!blockIndexFile.getParentFile().exists()) { - blockIndexFile.getParentFile().mkdirs(); - } - - if (!blockIndexFile.exists()) { - blockIndexFile.createNewFile(); - } - return blockIndexFile; - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L1Cache.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L1Cache.java deleted file mode 100644 index ad9e5b1ca40fd83fa4136e9333c26ef94cdffb98..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L1Cache.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.a.eye.skywalking.storage.block.index; - -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; - -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static com.a.eye.skywalking.storage.config.Config.BlockIndexEngine.L1_CACHE_SIZE; - -/** - * 块索引的一级缓存 - *

- * Created by xin on 2016/11/2. - */ -public class L1Cache { - - private static ILog logger = LogManager.getLogger(L1Cache.class); - private TreeSet cacheData = new TreeSet(); - private final ReadWriteLock updateLock = new ReentrantReadWriteLock(); - - void init(List data) { - StringBuilder initData = new StringBuilder(); - int size = data.size() > L1_CACHE_SIZE ? L1_CACHE_SIZE : data.size(); - for (int i = 0; i < size; i++) { - this.cacheData.add(data.get(i)); - initData.append(data.get(i)).append(","); - } - - if (logger.isDebugEnable()) { - logger.info("L1 cache init data : [{}]", initData.deleteCharAt(initData.length() - 1)); - } - } - - public Long find(long timestamp) { - Lock lock = updateLock.readLock(); - try { - lock.lock(); - return cacheData.higher(timestamp); - } finally { - lock.unlock(); - } - } - - public void add2Rebuild(long timestamp) { - TreeSet newCacheData = new TreeSet<>(cacheData); - newCacheData.add(timestamp); - - if (newCacheData.size() >= L1_CACHE_SIZE + 1) { - long removedData = newCacheData.pollFirst(); - logger.info("Add cache data : {}, removed cache Data:{}", timestamp, removedData); - } - - Lock lock = updateLock.writeLock(); - try { - lock.lock(); - cacheData = newCacheData; - } finally { - lock.unlock(); - } - } - -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L2Cache.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L2Cache.java deleted file mode 100644 index 81975f04def2797ed6ec98598844c1770124dd03..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/L2Cache.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.a.eye.skywalking.storage.block.index; - -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; - -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * 块索引的二级缓存 - */ -public class L2Cache { - - private ILog logger = LogManager.getLogger(L2Cache.class); - - private TreeSet cacheData = new TreeSet(); - private ReadWriteLock updateLock = new ReentrantReadWriteLock(); - - void init(List data) { - this.cacheData.addAll(data); - if (logger.isDebugEnable()) { - logger.info("L2 cache init data : {}", data); - } - } - - public Long find(long timestamp) { - Lock lock = updateLock.readLock(); - try { - lock.lock(); - return this.cacheData.higher(timestamp); - } finally { - lock.unlock(); - } - } - - public void add2Rebuild(long timestamp) { - TreeSet newCacheData = new TreeSet<>(cacheData); - newCacheData.add(timestamp); - Lock lock = updateLock.writeLock(); - - try { - lock.lock(); - cacheData.add(timestamp); - } finally { - lock.unlock(); - } - } - - public long getLastBlockIndex() { - return cacheData.last(); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/exception/BlockIndexPersistenceFailedException.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/exception/BlockIndexPersistenceFailedException.java deleted file mode 100644 index 03b2ef1690ec4ef2062045cc1a56369770e83ab6..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/block/index/exception/BlockIndexPersistenceFailedException.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.a.eye.skywalking.storage.block.index.exception; - -/** - * Created by xin on 2016/11/2. - */ -public class BlockIndexPersistenceFailedException extends Exception { - public BlockIndexPersistenceFailedException(String message, Exception e) { - super(message, e); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java index 477e744e7f0110dbfcf2531009f966e47ad9a0d9..d2c520e42271acfd7360df629df2ef97beb0a7cb 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java @@ -8,23 +8,17 @@ public class Config { public static int PORT = 34000; } + public static class DataConsumer { - public static int CHANNEL_SIZE = 10; + public static int CHANNEL_SIZE = 10; - public static int BUFFER_SIZE = 1000; + public static int BUFFER_SIZE = 1000; public static int CONSUMER_SIZE = 5; } - public static class BlockIndex { - public static String PATH = "/block_index"; - - public static String FILE_NAME = "data_file.index"; - } - - public static class DataFile { public static String PATH = "/data/file"; @@ -34,21 +28,19 @@ public class Config { public static class DataIndex { - public static String PATH = "/data/index"; + public static final int INDEX_LISTEN_PORT = 9300; - public static String FILE_NAME = "dataIndex"; + } - public static long SIZE = 1000 * 1000 * 1000; + public static class IndexOperator { - public static class Operator { - public static int CACHE_SIZE = 5; - } - } + public static class Finder { + public static int TOTAL = 50; - public static class BlockIndexEngine { - public static int L1_CACHE_SIZE = 10; + public static int IDEL = 20; + } } @@ -63,14 +55,4 @@ public class Config { public static String PATH_PREFIX = "/skywalking/storage_list/"; } - - public static class Finder { - public static int CACHED_SIZE = 10; - - - public static class DataSource { - public static int MAX_POOL_SIZE = 20; - public static int MIN_IDLE = 5; - } - } } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Constants.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Constants.java index 8a7e933b389d6a8569c2a6abe674db749a5dca76..b0da8231b644b25c1ac0e9a82b2a3c5bcbc7b594 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Constants.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Constants.java @@ -8,19 +8,19 @@ public class Constants { public static int MAX_BATCH_SIZE = 50; public static class SQL { - public static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "\n" + "(\n" - + " id INT PRIMARY KEY NOT NULL IDENTITY,\n" - + " tid_s0 INT NOT NULL,\n" - + " tid_s1 BIGINT NOT NULL,\n" - + " tid_s2 INT NOT NULL,\n" - + " tid_s3 INT NOT NULL,\n" - + " tid_s4 INT NOT NULL,\n" - + " tid_s5 INT NOT NULL,\n" - + " span_type INT NOT NULL, \n" - + " file_name BIGINT NOT NULL,\n" - + " file_name_suffix INT NOT NULL,\n" - + " offset BIGINT NOT NULL,\n" - + " length INT NOT NULL\n" + ");\n"; + public static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "( " + + " id INT PRIMARY KEY NOT NULL IDENTITY, " + + " tid_s0 INT NOT NULL, " + + " tid_s1 BIGINT NOT NULL, " + + " tid_s2 INT NOT NULL, " + + " tid_s3 INT NOT NULL, " + + " tid_s4 INT NOT NULL, " + + " tid_s5 INT NOT NULL, " + + " span_type INT NOT NULL, " + + " file_name BIGINT NOT NULL, " + + " file_name_suffix INT NOT NULL, " + + " offset BIGINT NOT NULL, " + + " length INT NOT NULL " + "); "; public static final String CREATE_INDEX = "CREATE INDEX \"index_data_trace_id_index\" ON " + TABLE_NAME + " " + "(tid_s0,tid_s1,tid_s2,tid_s3,tid_s4,tid_s5);"; diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/IndexDataCapacityMonitor.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/IndexDataCapacityMonitor.java deleted file mode 100644 index c3f3062bd632b3277f1b56a3fdb8ab71023c1dd1..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/IndexDataCapacityMonitor.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.a.eye.skywalking.storage.data; - -import com.a.eye.skywalking.health.report.HealthCollector; -import com.a.eye.skywalking.health.report.HeathReading; -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.storage.block.index.BlockIndexEngine; -import com.a.eye.skywalking.storage.data.index.IndexDBConnector; - -import java.sql.SQLException; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static com.a.eye.skywalking.storage.config.Config.DataIndex.SIZE; - -/** - * Created by xin on 2016/11/6. - */ -public class IndexDataCapacityMonitor { - - private static ILog logger = LogManager.getLogger(IndexDataCapacityMonitor.class); - private static Detector detector; - - public static void addIndexData(long timestamp, int size) { - if (detector.isDetectFor(timestamp)) { - detector.add(size); - } - } - - private static class Detector extends TimerTask { - - private AtomicLong currentSize; - private long timestamp; - private Timer timer = new Timer(); - - public Detector(long timestamp) { - this.timestamp = timestamp; - currentSize = new AtomicLong(); - start(); - } - - public Detector(long timestamp, long currentSize) { - this.currentSize = new AtomicLong(currentSize); - this.timestamp = timestamp; - start(); - } - - public void start() { - timer.schedule(this, 0, TimeUnit.SECONDS.toMillis(30)); - } - - public void stop() { - timer.cancel(); - } - - public boolean isDetectFor(long timestamp) { - return this.timestamp == timestamp; - } - - public void add(int updateRecordSize) { - currentSize.addAndGet(updateRecordSize); - } - - @Override - public void run() { - if (currentSize.get() > SIZE * 0.8) { - stop(); - notificationAddNewBlockIndexAndCreateNewIndexDB(); - HealthCollector.getCurrentHeathReading("Index Data Capacity Detector").updateData(HeathReading.INFO, - "Detector is detecting the index [%d]. and the capacity of {} is %d", timestamp, - currentSize.get()); - } - } - } - - private static void notificationAddNewBlockIndexAndCreateNewIndexDB() { - long timestamp = System.currentTimeMillis() + 5 * 60 * 1000; - BlockIndexEngine.newUpdator().addRecord(timestamp); - logger.info("Create a new Index DB [{}]", timestamp); - createNewIndexDB(timestamp); - detector = new Detector(timestamp); - } - - private static void createNewIndexDB(long timestamp) { - IndexDBConnector connector = new IndexDBConnector(timestamp); - connector.close(); - } - - public static void start() { - long timestamp = BlockIndexEngine.newFinder().findLastBlockIndex(); - - IndexDBConnector dbConnector = null; - try { - dbConnector = new IndexDBConnector(timestamp); - long count = 0; - try { - count = dbConnector.fetchIndexSize(); - } catch (SQLException e) { - logger.error("Failed to to fetch index size from DB:{}", timestamp, e); - } - detector = new Detector(timestamp, count); - logger.info("Index data capacity monitor started successfully!"); - } finally { - if (dbConnector != null) { - dbConnector.close(); - } - } - } - - - public static void stop() { - detector.stop(); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java index 5d5e8336cff1e6cf6c65f7e0028cc30841e5f4b4..c15df0580c49dd5cbc543d3c54f39c6933e28551 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java @@ -5,47 +5,33 @@ import com.a.eye.skywalking.health.report.HealthCollector; import com.a.eye.skywalking.health.report.HeathReading; import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.storage.block.index.BlockIndexEngine; import com.a.eye.skywalking.storage.data.file.DataFileWriter; -import com.a.eye.skywalking.storage.data.index.*; +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; +import com.a.eye.skywalking.storage.data.index.operator.IndexOperator; +import com.a.eye.skywalking.storage.data.index.operator.OperatorFactory; import com.a.eye.skywalking.storage.data.spandata.SpanData; -import java.util.Iterator; import java.util.List; public class SpanDataConsumer implements IConsumer { private static ILog logger = LogManager.getLogger(SpanDataConsumer.class); - private IndexDBConnectorCache cache; - private DataFileWriter fileWriter; + private DataFileWriter fileWriter; @Override public void init() { - cache = new IndexDBConnectorCache(); fileWriter = new DataFileWriter(); } @Override public void consume(List data) { - Iterator> iterator = - IndexMetaCollections.group(fileWriter.write(data), new GroupKeyBuilder() { - @Override - public Long buildKey(IndexMetaInfo metaInfo) { - return BlockIndexEngine.newFinder().find(metaInfo.getTraceStartTime()); - } - }).iterator(); - - while (iterator.hasNext()) { - IndexMetaGroup metaGroup = iterator.next(); - IndexOperator indexOperator = IndexOperator.newOperator(getDBConnector(metaGroup)); - indexOperator.batchUpdate(metaGroup); - HealthCollector.getCurrentHeathReading("SpanDataConsumer") - .updateData(HeathReading.INFO, "%s messages were successful consumed .", data.size()); - } - } + IndexMetaCollection collection = fileWriter.write(data); + + IndexOperator operator = OperatorFactory.createIndexOperator(); + operator.batchUpdate(collection); - private IndexDBConnector getDBConnector(IndexMetaGroup metaGroup) { - return cache.get(metaGroup.getKey()); + HealthCollector.getCurrentHeathReading("SpanDataConsumer") + .updateData(HeathReading.INFO, "%s messages were successful consumed .", data.size()); } @Override @@ -57,7 +43,6 @@ public class SpanDataConsumer implements IConsumer { @Override public void onExit() { - cache.close(); fileWriter.close(); } } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataFinder.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataFinder.java index 7fe81f0fca1cbeeb9ff8421dadc8acbf25a81c50..34b5688396dd88b8fc2dad7ca5ae33bf35a14278 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataFinder.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataFinder.java @@ -1,58 +1,33 @@ package com.a.eye.skywalking.storage.data; -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.network.grpc.TraceId; -import com.a.eye.skywalking.storage.block.index.BlockIndexEngine; -import com.a.eye.skywalking.storage.config.Config; -import com.a.eye.skywalking.storage.config.Constants; -import com.a.eye.skywalking.storage.data.exception.ConnectionNotFoundException; import com.a.eye.skywalking.storage.data.file.DataFileReader; import com.a.eye.skywalking.storage.data.index.*; +import com.a.eye.skywalking.storage.data.index.operator.FinderExecutor; +import com.a.eye.skywalking.storage.data.index.operator.IndexOperateExecutor; import com.a.eye.skywalking.storage.data.spandata.SpanData; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import java.sql.SQLException; -import java.util.*; -import java.util.concurrent.locks.ReentrantLock; - -import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_PASSWORD; -import static com.a.eye.skywalking.storage.config.Constants.SQL.DEFAULT_USER; -import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; public class SpanDataFinder { - private static IndexDataSourceCache datasourceCache = new IndexDataSourceCache(Config.Finder.CACHED_SIZE); - private static ReentrantLock createDatasourceLock = new ReentrantLock(); public static List find(TraceId traceId) { - long blockIndex = BlockIndexEngine.newFinder().find(traceId.getSegments(1)); - if (blockIndex == 0) { - return new ArrayList(); - } - - IndexDBConnector indexDBConnector = null; - IndexMetaCollection indexMetaCollection = null; - try { - indexDBConnector = fetchIndexDBConnector(blockIndex); - indexMetaCollection = indexDBConnector.queryByTraceId(traceId.getSegmentsList().toArray(new Long[traceId - .getSegmentsCount()])); - } finally { - if (indexDBConnector != null) { - indexDBConnector.close(); - } - } + IndexMetaCollection indexMetaCollection = IndexOperateExecutor.execute(new FinderExecutor( + traceId.getSegmentsList().toArray(new Long[traceId.getSegmentsCount()]))); if (indexMetaCollection == null) { return new ArrayList(); } - Iterator> iterator = IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder() { - @Override - public String buildKey(IndexMetaInfo metaInfo) { - return metaInfo.getFileName().fileName(); - } - }).iterator(); + Iterator> iterator = + IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder() { + @Override + public String buildKey(IndexMetaInfo metaInfo) { + return metaInfo.getFileName().fileName(); + } + }).iterator(); List result = new ArrayList(); while (iterator.hasNext()) { @@ -70,71 +45,4 @@ public class SpanDataFinder { return result; } - - private static long[] spiltTraceId(String traceId){ - return null; - } - - private static IndexDBConnector fetchIndexDBConnector(long blockIndex) { - HikariDataSource datasource = getOrCreate(blockIndex); - IndexDBConnector indexDBConnector; - try { - indexDBConnector = new IndexDBConnector(datasource.getConnection()); - } catch (SQLException e) { - throw new ConnectionNotFoundException("get connection failure.", e); - } - return indexDBConnector; - } - - private static HikariDataSource getOrCreate(long blockIndex) { - HikariDataSource datasource = datasourceCache.get(blockIndex); - if (datasource == null) { - createDatasourceLock.lock(); - try { - if (datasource == null) { - HikariConfig dataSourceConfig = generateDatasourceConfig(blockIndex); - datasource = new HikariDataSource(dataSourceConfig); - datasourceCache.put(blockIndex, datasource); - } - } finally { - createDatasourceLock.unlock(); - } - } - return datasource; - } - - private static HikariConfig generateDatasourceConfig(long blockIndex) { - HikariConfig config = new HikariConfig(); - config.setJdbcUrl(new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH), Config.DataIndex.FILE_NAME).generate(blockIndex)); - config.setDriverClassName(Constants.DRIVER_CLASS_NAME); - config.setUsername(DEFAULT_USER); - config.setPassword(DEFAULT_PASSWORD); - config.setMaximumPoolSize(Config.Finder.DataSource.MAX_POOL_SIZE); - config.setMinimumIdle(Config.Finder.DataSource.MIN_IDLE); - return config; - } - - private static long fetchStartTimeFromTraceId(String traceId) { - String[] traceIdSegment = traceId.split("\\."); - return Long.parseLong(traceIdSegment[traceIdSegment.length - 5]); - } - - private static class IndexDataSourceCache extends LinkedHashMap { - - private int cacheSize; - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - boolean removed = size() > cacheSize; - if (removed) { - eldest.getValue().close(); - } - return removed; - } - - public IndexDataSourceCache(int cacheSize) { - super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true); - this.cacheSize = cacheSize; - } - } } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectionNotFoundException.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectionNotFoundException.java deleted file mode 100644 index 393d01ec1a7d4b01552774656364e87939ad2763..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectionNotFoundException.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.a.eye.skywalking.storage.data.exception; - -/** - * Created by xin on 2016/11/5. - */ -public class ConnectionNotFoundException extends RuntimeException { - public ConnectionNotFoundException(String message, Exception e) { - super(message, e); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectorInitializeFailedException.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectorInitializeFailedException.java deleted file mode 100644 index 3d276aed3850efe0236d37e993af4ff765469be5..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/ConnectorInitializeFailedException.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.a.eye.skywalking.storage.data.exception; - -/** - * Created by xin on 2016/11/5. - */ -public class ConnectorInitializeFailedException extends RuntimeException { - public ConnectorInitializeFailedException(String message, Exception e) { - super(message, e); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperateFailedException.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperateFailedException.java new file mode 100644 index 0000000000000000000000000000000000000000..aa7e0f4794681f6ad0f25e0ea777ecd60d3b9622 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperateFailedException.java @@ -0,0 +1,10 @@ +package com.a.eye.skywalking.storage.data.exception; + +/** + * Created by xin on 2016/11/20. + */ +public class IndexOperateFailedException extends RuntimeException { + public IndexOperateFailedException(String message, Exception e) { + super(message, e); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperatorInitializeFailedException.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperatorInitializeFailedException.java new file mode 100644 index 0000000000000000000000000000000000000000..31444cd0462aec6d67126b8892fe6f497d785314 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/exception/IndexOperatorInitializeFailedException.java @@ -0,0 +1,10 @@ +package com.a.eye.skywalking.storage.data.exception; + +/** + * Created by xin on 2016/11/20. + */ +public class IndexOperatorInitializeFailedException extends RuntimeException { + public IndexOperatorInitializeFailedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/ConnectURLGenerator.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/ConnectURLGenerator.java deleted file mode 100644 index 14ad9cf4ac7c9e2ef065293b85415019aa05c2bc..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/ConnectURLGenerator.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.a.eye.skywalking.storage.data.index; - -/** - * Created by xin on 2016/11/13. - */ -public class ConnectURLGenerator { - - private String basePath; - private String dbFileName; - - public ConnectURLGenerator(String basePath, String dbFileName) { - this.basePath = basePath; - this.dbFileName = dbFileName; - } - - - public String generate(long timestamp) { - return "jdbc:hsqldb:file:" + basePath + "/" + timestamp + "/" + dbFileName; - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnector.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnector.java deleted file mode 100644 index 802e6f436c5d77ad2687851287ef6ff8c0d95398..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnector.java +++ /dev/null @@ -1,191 +0,0 @@ -package com.a.eye.skywalking.storage.data.index; - -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.storage.config.Config; -import com.a.eye.skywalking.storage.config.Constants; -import com.a.eye.skywalking.storage.data.exception.ConnectorInitializeFailedException; -import com.a.eye.skywalking.storage.data.file.DataFileNameDesc; -import com.a.eye.skywalking.storage.data.spandata.AckSpanData; -import com.a.eye.skywalking.storage.data.spandata.RequestSpanData; -import com.a.eye.skywalking.storage.data.spandata.SpanData; -import com.a.eye.skywalking.storage.data.spandata.SpanType; - -import java.sql.*; - -import static com.a.eye.skywalking.storage.config.Constants.SQL.*; -import static com.a.eye.skywalking.storage.util.PathResolver.getAbsolutePath; - -/** - * Created by xin on 2016/11/4. - */ -public class IndexDBConnector { - - private static ILog logger = LogManager.getLogger(IndexDBConnector.class); - - static { - try { - Class.forName(Constants.DRIVER_CLASS_NAME); - } catch (ClassNotFoundException e) { - //never - } - } - - private long timestamp; - private Connection connection; - private ConnectURLGenerator generator = - new ConnectURLGenerator(getAbsolutePath(Config.DataIndex.PATH), Config.DataIndex.FILE_NAME); - - public IndexDBConnector(long timestamp) { - this.timestamp = timestamp; - createConnection(); - createTableAndIndexIfNecessary(); - } - - public IndexDBConnector(Connection connection) { - this.connection = connection; - createTableAndIndexIfNecessary(); - } - - private void createTableAndIndexIfNecessary() { - try { - if (!tableExists()) { - createTable(); - createIndex(); - } - } catch (SQLException e) { - throw new ConnectorInitializeFailedException("Failed to create table and index.", e); - } - } - - - private void createConnection() { - try { - connection = DriverManager.getConnection(generator.generate(timestamp), DEFAULT_USER, DEFAULT_PASSWORD); - connection.setAutoCommit(true); - } catch (SQLException e) { - throw new ConnectorInitializeFailedException("Failed to create connection.", e); - } - } - - private boolean tableExists() throws SQLException { - PreparedStatement ps = connection.prepareStatement(QUERY_TABLES); - ResultSet rs = ps.executeQuery(); - rs.next(); - - boolean exists = rs.getInt("TABLE_COUNT") == 1; - rs.close(); - ps.close(); - - return exists; - } - - private void createTable() throws SQLException { - PreparedStatement ps = connection.prepareStatement(CREATE_TABLE); - ps.execute(); - ps.close(); - } - - private void createIndex() throws SQLException { - PreparedStatement ps = connection.prepareStatement(CREATE_INDEX); - ps.execute(); - ps.close(); - } - - public long getTimestamp() { - return timestamp; - } - - public void batchUpdate(IndexMetaGroup metaGroup) throws SQLException { - int currentIndex = 0; - PreparedStatement ps = null; - try { - ps = connection.prepareStatement(INSERT_INDEX); - boolean isCommitted = false; - for (IndexMetaInfo metaInfo : metaGroup.getMetaInfo()) { - ps.setInt(1, metaInfo.getTraceId()[0].intValue()); - ps.setLong(2, metaInfo.getTraceId()[1]); - ps.setInt(3, metaInfo.getTraceId()[2].intValue()); - ps.setInt(4, metaInfo.getTraceId()[3].intValue()); - ps.setInt(5, metaInfo.getTraceId()[4].intValue()); - ps.setInt(6, metaInfo.getTraceId()[5].intValue()); - ps.setInt(7, metaInfo.getSpanType().getValue()); - ps.setLong(8, metaInfo.getFileName().getName()); - ps.setInt(9, metaInfo.getFileName().getSuffix()); - ps.setLong(10, metaInfo.getOffset()); - ps.setInt(11, metaInfo.getLength()); - ps.addBatch(); - if (++currentIndex > Constants.MAX_BATCH_SIZE) { - ps.executeBatch(); - isCommitted = true; - } else { - isCommitted = false; - } - } - - if (!isCommitted) { - ps.executeBatch(); - } - - } finally { - if (ps != null) - ps.close(); - } - - } - - public long fetchIndexSize() throws SQLException { - PreparedStatement ps = connection.prepareStatement(QUERY_INDEX_SIZE); - ResultSet rs = ps.executeQuery(); - rs.next(); - - long indexSize = rs.getLong("INDEX_SIZE"); - rs.close(); - ps.close(); - - return indexSize; - } - - public IndexMetaCollection queryByTraceId(Long[] traceId) { - try { - PreparedStatement ps = connection.prepareStatement(QUERY_TRACE_ID); - ps.setInt(1, traceId[0].intValue()); - ps.setLong(2, traceId[1]); - ps.setInt(3, traceId[2].intValue()); - ps.setInt(4, traceId[3].intValue()); - ps.setInt(5, traceId[4].intValue()); - ps.setInt(6, traceId[5].intValue()); - ResultSet rs = ps.executeQuery(); - - IndexMetaCollection collection = new IndexMetaCollection(); - while (rs.next()) { - SpanType spanType = SpanType.convert(rs.getInt("span_type")); - SpanData spanData = null; - - if (SpanType.ACKSpan == spanType) { - spanData = new AckSpanData(); - } else if (SpanType.RequestSpan == spanType) { - spanData = new RequestSpanData(); - } - - collection.add(new IndexMetaInfo(spanData, - new DataFileNameDesc(rs.getLong("file_name"), rs.getInt("file_name_suffix")), - rs.getLong("offset"), rs.getInt("length"))); - } - return collection; - } catch (SQLException e) { - logger.error("Failed to query trace Id [{}]", traceId, e); - return new IndexMetaCollection(); - } - } - - public void close() { - try { - connection.close(); - } catch (SQLException e) { - logger.error("Failed to close index db connector", e); - } - } - - -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnectorCache.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnectorCache.java deleted file mode 100644 index 1d85a1c62f735ac4990362db4eee96dec9443090..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexDBConnectorCache.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.a.eye.skywalking.storage.data.index; - -import com.a.eye.skywalking.storage.config.Config; - -import java.util.LinkedHashMap; -import java.util.Map; - -public class IndexDBConnectorCache { - - private LRUCache cachedOperators; - - public IndexDBConnectorCache() { - cachedOperators = new LRUCache(Config.DataIndex.Operator.CACHE_SIZE); - } - - public IndexDBConnector get(long timestamp) { - IndexDBConnector connector = (IndexDBConnector) cachedOperators.get(timestamp); - if (connector == null) { - connector = new IndexDBConnector(timestamp); - updateCache(timestamp, connector); - } - return connector; - } - - public void close(){ - cachedOperators.close(); - } - - private void updateCache(long timestamp, IndexDBConnector operator) { - cachedOperators.put(timestamp, operator); - } - - private void removedCache(IndexDBConnector connector) { - connector.close(); - } - - private class LRUCache extends LinkedHashMap { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - boolean isNeedRemove = size() > Config.DataIndex.Operator.CACHE_SIZE; - if (isNeedRemove) { - removedCache(eldest.getValue()); - } - return isNeedRemove; - } - - public LRUCache(int cacheSize) { - super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true); - } - - public void close(){ - for(Map.Entry entry : this.entrySet()){ - entry.getValue().close(); - } - } - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexMetaCollection.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexMetaCollection.java index cf8d4e7faf7ca7dda9cd6a6a049027481c75aa0f..401d2c54d0e9170ef1118468263853748f7ad072 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexMetaCollection.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexMetaCollection.java @@ -1,9 +1,6 @@ package com.a.eye.skywalking.storage.data.index; -import com.a.eye.skywalking.storage.block.index.BlockFinder; -import com.a.eye.skywalking.storage.block.index.BlockIndexEngine; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java deleted file mode 100644 index 748d5c128725ba555b1ae709a7519ac97d3da915..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.a.eye.skywalking.storage.data.index; - -import com.a.eye.skywalking.storage.data.IndexDataCapacityMonitor; -import com.a.eye.skywalking.storage.data.exception.IndexMetaStoredFailedException; - -public class IndexOperator { - - private IndexDBConnector connector; - private long timestamp; - - private IndexOperator(IndexDBConnector connector) { - this.connector = connector; - timestamp = connector.getTimestamp(); - } - - public void batchUpdate(IndexMetaGroup metaGroup) { - try { - connector.batchUpdate(metaGroup); - IndexDataCapacityMonitor.addIndexData(timestamp, metaGroup.size()); - } catch (Exception e) { - throw new IndexMetaStoredFailedException("Failed to batch save index meta", e); - } - } - - public static IndexOperator newOperator(IndexDBConnector indexDBConnector) { - return new IndexOperator(indexDBConnector); - } -} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/Executor.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/Executor.java new file mode 100644 index 0000000000000000000000000000000000000000..a61133f7fb56cc2ba0c04fc460cad04259b8dd12 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/Executor.java @@ -0,0 +1,7 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; + +interface Executor { + T execute(IndexOperator indexOperator); +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/FinderExecutor.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/FinderExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..688c833b9c222c026cdee59a04b76d8c245a44e9 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/FinderExecutor.java @@ -0,0 +1,21 @@ +package com.a.eye.skywalking.storage.data.index.operator; + + +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; + +/** + * Created by xin on 2016/11/20. + */ +public class FinderExecutor implements Executor { + + private Long[] traceIdSegment; + + public FinderExecutor(Long[] traceIdSegment) { + this.traceIdSegment = traceIdSegment; + } + + @Override + public com.a.eye.skywalking.storage.data.index.IndexMetaCollection execute(IndexOperator indexOperator) { + return indexOperator.findIndex(traceIdSegment); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperateExecutor.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperateExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..2f4113f26e48eed5727f6769361518981d4cbc94 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperateExecutor.java @@ -0,0 +1,36 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.storage.config.Config; +import com.a.eye.skywalking.storage.data.exception.IndexOperateFailedException; +import com.a.eye.skywalking.storage.data.index.operator.pool.IndexOperatorPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.elasticsearch.client.transport.TransportClient; + +public class IndexOperateExecutor { + + private static IndexOperatorPool indexOperatorPool; + + public static T execute(Executor executor) { + TransportClient client = null; + try { + client = indexOperatorPool.borrowObject(); + return executor.execute(new IndexOperatorImpl(client)); + } catch (Exception e) { + throw new IndexOperateFailedException("Index operate failed.", e); + } finally { + indexOperatorPool.returnObject(client); + } + } + + static { + initializeIndexOperatorPool(); + } + + private static void initializeIndexOperatorPool() { + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxTotal(Config.IndexOperator.Finder.TOTAL); + poolConfig.setMaxIdle(Config.IndexOperator.Finder.IDEL); + poolConfig.setTestOnBorrow(true); + indexOperatorPool = new IndexOperatorPool(poolConfig); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperator.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperator.java new file mode 100644 index 0000000000000000000000000000000000000000..8ecd0b83157c9db27730d4232ea6873348eda2e1 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperator.java @@ -0,0 +1,9 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; + +public interface IndexOperator { + void batchUpdate(IndexMetaCollection metaInfos); + + IndexMetaCollection findIndex(Long[] traceId); +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperatorImpl.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperatorImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..cd050f2bf86e34bce76fa08d344a5147ca68f469 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/IndexOperatorImpl.java @@ -0,0 +1,62 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.health.report.HealthCollector; +import com.a.eye.skywalking.health.report.HeathReading; +import com.a.eye.skywalking.logging.api.ILog; +import com.a.eye.skywalking.logging.api.LogManager; +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; +import com.a.eye.skywalking.storage.data.index.IndexMetaInfo; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +public class IndexOperatorImpl implements IndexOperator { + + private static ILog logger = LogManager.getLogger(IndexOperatorImpl.class); + + private TransportClient client; + + public IndexOperatorImpl(TransportClient client) { + this.client = client; + } + + @Override + public void batchUpdate(IndexMetaCollection metaInfos) { + BulkRequestBuilder requestBuilder = client.prepareBulk(); + for (IndexMetaInfo indexMetaInfo : metaInfos) { + try { + requestBuilder.add(client.prepareIndex("skywalking", "index").setSource(buildSource(indexMetaInfo))); + } catch (Exception e) { + logger.error("Failed to update index.", e); + HealthCollector.getCurrentHeathReading("IndexOperator") + .updateData(HeathReading.ERROR, "Failed to " + "update index."); + } + } + + BulkResponse bulkRequest = requestBuilder.get(); + if (bulkRequest.hasFailures()) { + HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR, + "Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage()); + } + } + + private XContentBuilder buildSource(IndexMetaInfo indexMetaInfo) throws IOException { + XContentBuilder xContentBuilder = jsonBuilder().startObject().field("traceid_s0", indexMetaInfo.getTraceId()[0]) + .field("traceid_s1", indexMetaInfo.getTraceId()[1]).field("traceid_s2", indexMetaInfo.getTraceId()[2]) + .field("traceid_s3", indexMetaInfo.getTraceId()[3]).field("traceid_s4", indexMetaInfo.getTraceId()[4]) + .field("traceid_s5", indexMetaInfo.getTraceId()[5]).field("fileName", indexMetaInfo.getFileName()) + .field("offset", indexMetaInfo.getOffset()).field("length", indexMetaInfo.getLength()).endObject(); + return xContentBuilder; + } + + @Override + public IndexMetaCollection findIndex(Long[] traceId) { + return null; + } + +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/OperatorFactory.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/OperatorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..9e654f1e9b59cd5614ac890c6976add53f842113 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/OperatorFactory.java @@ -0,0 +1,22 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.storage.config.Config; +import com.a.eye.skywalking.storage.data.exception.IndexOperatorInitializeFailedException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import java.net.InetAddress; + +public class OperatorFactory { + + public static IndexOperator createIndexOperator() { + try { + return new IndexOperatorImpl(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress( + new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT))); + } catch (Exception e) { + throw new IndexOperatorInitializeFailedException("Failed to initialze operator.", e); + } + } + +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/UpdateExecutor.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/UpdateExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..67d69a9cb619aedbed93f8b5533cfa40ac150a2a --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/UpdateExecutor.java @@ -0,0 +1,20 @@ +package com.a.eye.skywalking.storage.data.index.operator; + +import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; + +/** + * Created by xin on 2016/11/20. + */ +public class UpdateExecutor implements Executor { + + private IndexMetaCollection metaCollection; + + public UpdateExecutor(IndexMetaCollection metaCollection) { + this.metaCollection = metaCollection; + } + + @Override + public IndexMetaCollection execute(IndexOperator indexOperator) { + return null; + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPool.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPool.java new file mode 100644 index 0000000000000000000000000000000000000000..8469ad507f41a44a6215c323811acdefb677b07a --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPool.java @@ -0,0 +1,11 @@ +package com.a.eye.skywalking.storage.data.index.operator.pool; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.elasticsearch.client.transport.TransportClient; + +public class IndexOperatorPool extends GenericObjectPool { + public IndexOperatorPool(GenericObjectPoolConfig poolConfig) { + super(new IndexOperatorPooledObjectFactory(), poolConfig); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPooledObjectFactory.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPooledObjectFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..326582512d6b3b8f53ccde8550e8ef5a1c17fc67 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/operator/pool/IndexOperatorPooledObjectFactory.java @@ -0,0 +1,31 @@ +package com.a.eye.skywalking.storage.data.index.operator.pool; + +import com.a.eye.skywalking.storage.config.Config; +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import java.net.InetAddress; + +public class IndexOperatorPooledObjectFactory extends BasePooledObjectFactory { + + @Override + public TransportClient create() throws Exception { + return new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress( + new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT)); + } + + @Override + public PooledObject wrap(org.elasticsearch.client.transport.TransportClient client) { + return new DefaultPooledObject<>(client); + } + + @Override + public void destroyObject(PooledObject p) throws Exception { + super.destroyObject(p); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/main/resources/config.properties b/skywalking-storage-center/skywalking-storage/src/main/resources/config.properties index ebd45e97bf59e0daf2223b1fe317837f97141106..4b9ed9624daa4deb4790a206eb9c4d9c02bc2176 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/resources/config.properties +++ b/skywalking-storage-center/skywalking-storage/src/main/resources/config.properties @@ -10,29 +10,11 @@ server.port=34000 # the size of data consumer #dataconsumer.consumer_size = 5 # -# the path that storage block index -#blockindex.path=/block-index -# -# the name of file which storage block index -#blockindex.file_name=data_file.index -# # the path that storage data file #datafile.path=/data/file # # the max size of data file (byte) -datafile.size=50000000 -# -# the path that storage data index -#dataindex.path=/data/index -# -# the name of the file which storage data index -#dataindex.file_name=dataIndex -# -# the max size of data index -dataindex.size=1000000 -# -# the cached size of data index operator -#dataindex.operator.cache_size=10 +#datafile.size=50000000 # # the auth info which registry center #registrycenter.auth_info= @@ -57,3 +39,12 @@ registrycenter.connect_url=127.0.0.1:2181 # # the min idle of data source that finder operate #finder.datasource.min_idle=5 +# +# the port which data index listening +DataIndex.INDEX_LISTEN_PORT=9300 +# +# the total size of finder +#indexoperator.finder.total=50 +# +# the max idel of finder +#indexoperator.finder.idel=20 diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java index 07252cb9fd29c08d86d6d6863c832e9a8c632c2b..59bd31437ebdb4c0ec7624361ed5cf455f7fa3c3 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java @@ -9,7 +9,7 @@ import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub; public class StorageClient { private static ManagedChannel channel = - ManagedChannelBuilder.forAddress("10.128.35.79", 34000).usePlaintext(true).build(); + ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build(); private static SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageServiceStub = newStub(channel); @@ -21,20 +21,21 @@ public class StorageClient { public static void main(String[] args) throws InterruptedException { RequestSpan requestSpan = RequestSpan.newBuilder().setSpanType(1).setAddress("127.0.0.1").setApplicationId("1").setCallType("1") - .setLevelId(0).setProcessNo("19287").setStartDate(System.currentTimeMillis()).setTraceId( + .setParentLevel("0.0.0").setLevelId(0).setProcessNo("19287") + .setStartDate(System.currentTimeMillis()).setTraceId( TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828) .addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1") - .setViewPointId("http://localhost:8080/wwww/test/helloWorld").build(); + .setViewPointId("com.a.eye.skywalking.storage.block.index.BlockIndexEngine.newFinder").build(); - AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId( + AckSpan ackSpan = AckSpan.newBuilder().setParentLevel("0.0.0").setLevelId(0).setCost(10).setTraceId( TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828) .addSegments(2277).addSegments(53).addSegments(3).build()).setStatusCode(0) - .setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); + .setViewpointId("com.a.eye.skywalking.storage.block.index.BlockIndexEngine.newFinder").build(); long startTime = System.currentTimeMillis(); - for (int i = 0; i < 1000000; i++) { + for (int i = 0; i < 10000; i++) { StreamObserver ackSpanStreamObserver = spanStorageServiceStub.storageACKSpan(new StreamObserver() { @Override @@ -86,7 +87,7 @@ public class StorageClient { requestSpanStreamObserver.onCompleted(); - if(i % 500_000 == 0){ + if (i % 500_000 == 0) { System.out.println(i); } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java index 70d92127a402c847eac8ec215b6c6b880c13ac75..8edc4b69cde884d9c9b6e2eb05efff178c6d9e65 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java @@ -36,7 +36,7 @@ public class SearchClient { StreamObserver searchResult = searchServiceStub.search(serverStreamObserver); searchResult.onNext(QueryTask.newBuilder().setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828) + TraceId.newBuilder().addSegments(201611).addSegments(1479267274243L).addSegments(8504828) .addSegments(2277).addSegments(53).addSegments(3).build()).setTaskId(1).build()); searchResult.onCompleted();