提交 dcf859df 编写于 作者: A ascrutae

完成按照TraceID读取数据的逻辑

上级 06f171d8
......@@ -2,10 +2,7 @@ package com.a.eye.skywalking.storage.data;
import com.a.eye.datacarrier.consumer.IConsumer;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.IndexDBConnector;
import com.a.eye.skywalking.storage.data.index.IndexMetaGroup;
import com.a.eye.skywalking.storage.data.index.IndexOperator;
import com.a.eye.skywalking.storage.data.index.IndexDBConnectorCache;
import com.a.eye.skywalking.storage.data.index.*;
import java.util.Iterator;
import java.util.List;
......@@ -15,7 +12,7 @@ public class SpanDataConsumer implements IConsumer<SpanData> {
private IndexDBConnectorCache cache;
private DataFileWriter fileWriter;
public SpanDataConsumer(){
public SpanDataConsumer() {
cache = new IndexDBConnectorCache();
fileWriter = new DataFileWriter();
}
......@@ -23,18 +20,24 @@ public class SpanDataConsumer implements IConsumer<SpanData> {
@Override
public void consume(List<SpanData> data) {
Iterator<IndexMetaGroup> iterator = fileWriter.write(data).group();
Iterator<IndexMetaGroup<Long>> iterator =
IndexMetaCollections.group(fileWriter.write(data), new GroupKeyBuilder<Long>() {
@Override
public Long buildKey(IndexMetaInfo metaInfo) {
return metaInfo.getStartTime();
}
}).iterator();
while (iterator.hasNext()) {
IndexMetaGroup metaGroup = iterator.next();
IndexMetaGroup<Long> metaGroup = iterator.next();
IndexOperator indexOperator = IndexOperator.newOperator(getDBConnector(metaGroup));
indexOperator.batchUpdate(metaGroup);
}
}
private IndexDBConnector getDBConnector(IndexMetaGroup metaGroup){
return cache.get(metaGroup.getTimestamp());
private IndexDBConnector getDBConnector(IndexMetaGroup<Long> metaGroup) {
return cache.get(metaGroup.getKey());
}
@Override
......
package com.a.eye.skywalking.storage.data;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import com.a.eye.skywalking.storage.data.file.DataFileReader;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Created by xin on 2016/11/6.
*/
public class SpanDataFinder {
public static List<byte[]> find(String traceId) {
long blockIndex = BlockIndexEngine.newFinder().find(fetchStartTimeFromTraceId(traceId));
IndexDBConnector indexDBConnector = new IndexDBConnector(blockIndex);
IndexMetaCollection indexMetaCollection = indexDBConnector.queryByTraceId(traceId);
Iterator<IndexMetaGroup<String>> iterator =
IndexMetaCollections.group(indexMetaCollection, new GroupKeyBuilder<String>() {
@Override
public String buildKey(IndexMetaInfo metaInfo) {
return metaInfo.getFileName();
}
}).iterator();
List<byte[]> result = new ArrayList<byte[]>();
while (iterator.hasNext()) {
IndexMetaGroup<String> group = iterator.next();
result.addAll(new DataFileReader(group.getKey()).read(group.getMetaInfo()));
}
return result;
}
private static long fetchStartTimeFromTraceId(String traceId) {
return -1;
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/6.
*/
public class SpanDataReadFailedException extends RuntimeException {
public SpanDataReadFailedException(String message, Exception e) {
super(message, e);
}
}
......@@ -4,6 +4,7 @@ import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.SpanData;
import com.a.eye.skywalking.storage.data.exception.DataFileOperatorCreateFailedException;
import com.a.eye.skywalking.storage.data.exception.SpanDataPersistenceFailedException;
import com.a.eye.skywalking.storage.data.exception.SpanDataReadFailedException;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import java.io.File;
......@@ -26,9 +27,8 @@ public class DataFile {
operator = new DataFileOperator();
}
public DataFile(String fileName, long offset) {
public DataFile(String fileName) {
this.fileName = fileName;
this.currentOffset = offset;
operator = new DataFileOperator();
}
......@@ -62,6 +62,18 @@ public class DataFile {
}
}
public byte[] read(long offset, int length) {
byte[] data = new byte[length];
try {
operator.getReader().getChannel().position(offset);
operator.getReader().read(data, 0, length);
return data;
} catch (IOException e) {
throw new SpanDataReadFailedException(
"Failed to read dataFile[" + fileName + "], offset: " + offset + " " + "lenght: " + length, e);
}
}
class DataFileOperator {
private FileOutputStream writer;
private FileInputStream reader;
......
package com.a.eye.skywalking.storage.data.file;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Created by xin on 2016/11/6.
*/
public class DataFileReader {
private DataFile dataFile;
public DataFileReader(String fileName) {
dataFile = new DataFile(fileName);
}
public List<byte[]> read(List<IndexMetaInfo> metaInfo) {
List<byte[]> metaData = new ArrayList<byte[]>();
for (IndexMetaInfo indexMetaInfo : metaInfo){
metaData.add(dataFile.read(indexMetaInfo.getOffset(), indexMetaInfo.getLength()));
}
return metaData;
}
}
package com.a.eye.skywalking.storage.data.file;
import com.a.eye.skywalking.storage.data.SpanData;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollections;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import java.util.List;
......@@ -13,12 +13,12 @@ public class DataFileWriter {
dataFile = DataFilesManager.createNewDataFile();
}
public IndexMetaCollections write(List<SpanData> spanData) {
public IndexMetaCollection write(List<SpanData> spanData) {
if (dataFile.overLimitLength()) {
dataFile = DataFilesManager.createNewDataFile();
}
IndexMetaCollections collections = new IndexMetaCollections();
IndexMetaCollection collections = new IndexMetaCollection();
for (SpanData data : spanData) {
collections.add(dataFile.write(data));
}
......
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
/**
* Created by xin on 2016/11/6.
*/
public interface GroupKeyBuilder<T> {
T buildKey(IndexMetaInfo metaInfo);
}
......@@ -86,7 +86,7 @@ public class IndexDBConnector {
return timestamp;
}
public void batchUpdate(IndexMetaGroup metaGroup) throws SQLException {
public void batchUpdate(IndexMetaGroup<Long> metaGroup) throws SQLException {
int currentIndex = 0;
PreparedStatement ps = connection.prepareStatement(INSERT_INDEX);
for (IndexMetaInfo metaInfo : metaGroup.getMetaInfo()) {
......@@ -117,6 +117,10 @@ public class IndexDBConnector {
return indexSize;
}
public IndexMetaCollection queryByTraceId(String traceId) {
return null;
}
class ConnectURLGenerator {
private String basePath;
......
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.block.index.BlockFinder;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class IndexMetaCollection implements Iterable<IndexMetaInfo> {
private List<IndexMetaInfo> metaInfo;
private BlockFinder finder;
public IndexMetaCollection() {
metaInfo = new ArrayList<>();
finder = BlockIndexEngine.newFinder();
}
public Iterator<IndexMetaGroup> group() {
List<IndexMetaGroup> indexMetaGroups = new ArrayList<IndexMetaGroup>();
for (IndexMetaInfo info : metaInfo) {
long timestamp = finder.find(info.getStartTime());
int index = indexMetaGroups.indexOf(new IndexMetaGroup(timestamp));
IndexMetaGroup metaGroup;
if (index == -1) {
metaGroup = new IndexMetaGroup(timestamp);
indexMetaGroups.add(metaGroup);
} else {
metaGroup = indexMetaGroups.get(index);
}
metaGroup.addIndexMetaInfo(info);
}
return indexMetaGroups.iterator();
}
public void add(IndexMetaInfo info) {
metaInfo.add(info);
}
@Override
public Iterator<IndexMetaInfo> iterator() {
return metaInfo.iterator();
}
}
package com.a.eye.skywalking.storage.data.index;
import com.a.eye.skywalking.storage.block.index.BlockFinder;
import com.a.eye.skywalking.storage.block.index.BlockIndexEngine;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Created by xin on 2016/11/6.
*/
public class IndexMetaCollections {
private List<IndexMetaInfo> metaInfo;
private BlockFinder finder;
public IndexMetaCollections() {
metaInfo = new ArrayList<>();
finder = BlockIndexEngine.newFinder();
}
public static <T> List<IndexMetaGroup<T>> group(IndexMetaCollection indexMetaCollection,
GroupKeyBuilder<T> builder) {
List<IndexMetaGroup<T>> indexMetaGroups = new ArrayList<IndexMetaGroup<T>>();
public Iterator<IndexMetaGroup> group() {
List<IndexMetaGroup> indexMetaGroups = new ArrayList<IndexMetaGroup>();
for (IndexMetaInfo info : metaInfo) {
long timestamp = finder.find(info.getStartTime());
for (IndexMetaInfo metaInfo : indexMetaCollection) {
T key = builder.buildKey(metaInfo);
int index = indexMetaGroups.indexOf(new IndexMetaGroup(timestamp));
int index = indexMetaGroups.indexOf(new IndexMetaGroup(key));
IndexMetaGroup metaGroup;
if (index == -1) {
metaGroup = new IndexMetaGroup(timestamp);
metaGroup = new IndexMetaGroup(key);
indexMetaGroups.add(metaGroup);
} else {
metaGroup = indexMetaGroups.get(index);
}
metaGroup.addIndexMetaInfo(info);
metaGroup.addIndexMetaInfo(metaInfo);
}
return indexMetaGroups.iterator();
}
public void add(IndexMetaInfo info) {
metaInfo.add(info);
return indexMetaGroups;
}
}
package com.a.eye.skywalking.storage.data.index;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Created by xin on 2016/11/4.
*/
public class IndexMetaGroup {
public class IndexMetaGroup<V>{
private long timestamp;
private V key;
private List<IndexMetaInfo> metaInfo;
public IndexMetaGroup(long timestamp) {
this.timestamp = timestamp;
public IndexMetaGroup(V key) {
this.key = key;
metaInfo = new ArrayList<IndexMetaInfo>();
}
public long getTimestamp() {
return timestamp;
public V getKey() {
return key;
}
public List<IndexMetaInfo> getMetaInfo() {
......@@ -36,18 +37,19 @@ public class IndexMetaGroup {
if (o == null || getClass() != o.getClass())
return false;
IndexMetaGroup that = (IndexMetaGroup) o;
IndexMetaGroup<?> that = (IndexMetaGroup<?>) o;
return timestamp == that.timestamp;
return key.equals(that.key);
}
@Override
public int hashCode() {
return (int) (timestamp ^ (timestamp >>> 32));
return key.hashCode();
}
public int size() {
return metaInfo.size();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册