提交 9ce29b69 编写于 作者: A ascrutae

修复文件名时间不对导致查询不出来

上级 fb2877f4
......@@ -12,6 +12,7 @@ import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.config.ConfigInitializer;
import com.a.eye.skywalking.storage.data.file.DataFilesManager;
import com.a.eye.skywalking.storage.data.index.operator.OperatorFactory;
import com.a.eye.skywalking.storage.listener.SearchListener;
import com.a.eye.skywalking.storage.listener.StorageListener;
import com.a.eye.skywalking.storage.util.NetUtils;
......@@ -39,9 +40,10 @@ public class Main {
public static void main(String[] args) {
try {
initializeParam();
HealthCollector.init(SERVER_REPORTER_NAME);
OperatorFactory.initOperatorPool();
DataFilesManager.init();
provider = ServiceProvider.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener())
......
......@@ -28,7 +28,7 @@ public class Config {
public static class DataIndex {
public static final int INDEX_LISTEN_PORT = 9300;
public static int INDEX_LISTEN_PORT = 9300;
}
......
......@@ -3,8 +3,8 @@ package com.a.eye.skywalking.storage.data;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.storage.data.file.DataFileReader;
import com.a.eye.skywalking.storage.data.index.*;
import com.a.eye.skywalking.storage.data.index.operator.FinderExecutor;
import com.a.eye.skywalking.storage.data.index.operator.IndexOperateExecutor;
import com.a.eye.skywalking.storage.data.index.operator.IndexOperator;
import com.a.eye.skywalking.storage.data.index.operator.OperatorFactory;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import java.util.ArrayList;
......@@ -14,8 +14,7 @@ import java.util.List;
public class SpanDataFinder {
public static List<SpanData> find(TraceId traceId) {
IndexMetaCollection indexMetaCollection = IndexOperateExecutor.execute(new FinderExecutor(
traceId.getSegmentsList().toArray(new Long[traceId.getSegmentsCount()])));
IndexMetaCollection indexMetaCollection = fetchIndexMetaInfos(traceId);
if (indexMetaCollection == null) {
return new ArrayList<SpanData>();
......@@ -45,4 +44,18 @@ public class SpanDataFinder {
return result;
}
private static IndexMetaCollection fetchIndexMetaInfos(TraceId traceId) {
IndexMetaCollection indexMetaCollection = new IndexMetaCollection();
IndexOperator indexOperator = null;
try {
indexOperator = OperatorFactory.getIndexOperatorFromPool();
indexMetaCollection =
indexOperator.findIndex(traceId.getSegmentsList().toArray(new Long[traceId.getSegmentsCount()]));
} finally {
if (indexOperator != null)
OperatorFactory.returnIndexOperator(indexOperator);
}
return indexMetaCollection;
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/20.
*/
public class IndexOperateFailedException extends RuntimeException {
public IndexOperateFailedException(String message, Exception e) {
super(message, e);
}
}
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/21.
*/
public class IndexOperatorBorrowFailedException extends RuntimeException {
}
......@@ -17,19 +17,19 @@ public class DataFileNameDesc {
public DataFileNameDesc() {
name = System.currentTimeMillis();
suffix = DATA_FILE_NAME_SUFFIX.getAndIncrement();
fileNameStr = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SS").format(name) + "_" + suffix;
fileNameStr = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").format(name) + "_" + suffix;
}
public DataFileNameDesc(long name, int suffix) {
this.name = name;
this.suffix = suffix;
fileNameStr = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SS").format(name) + "_" + suffix;
fileNameStr = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").format(name) + "_" + suffix;
}
public DataFileNameDesc(String fileName) {
int lastIndex = fileName.lastIndexOf('_');
try {
this.name = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SS").parse(fileName.substring(0, lastIndex - 1))
this.name = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").parse(fileName.substring(0, lastIndex - 1))
.getTime();
} catch (ParseException e) {
}
......
package com.a.eye.skywalking.storage.data.index.operator;
interface Executor<T> {
T execute(IndexOperator indexOperator);
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
/**
* Created by xin on 2016/11/20.
*/
public class FinderExecutor implements Executor<IndexMetaCollection> {
private Long[] traceIdSegment;
public FinderExecutor(Long[] traceIdSegment) {
this.traceIdSegment = traceIdSegment;
}
@Override
public IndexMetaCollection execute(IndexOperator indexOperator) {
return indexOperator.findIndex(traceIdSegment);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.exception.IndexOperateFailedException;
import com.a.eye.skywalking.storage.data.index.operator.pool.IndexOperatorPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.transport.TransportClient;
public class IndexOperateExecutor {
private static IndexOperatorPool indexOperatorPool;
public static <T> T execute(Executor<T> executor) {
TransportClient client = null;
try {
client = indexOperatorPool.borrowObject();
return executor.execute(new IndexOperatorImpl(client));
} catch (Exception e) {
throw new IndexOperateFailedException("Index operate failed.", e);
} finally {
indexOperatorPool.returnObject(client);
}
}
static {
initializeIndexOperatorPool();
}
private static void initializeIndexOperatorPool() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(Config.IndexOperator.Finder.TOTAL);
poolConfig.setMaxIdle(Config.IndexOperator.Finder.IDEL);
poolConfig.setTestOnBorrow(true);
indexOperatorPool = new IndexOperatorPool(poolConfig);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.data.file.DataFileNameDesc;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import com.a.eye.skywalking.storage.data.spandata.SpanType;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
public interface IndexOperator {
int batchUpdate(IndexMetaCollection metaInfos);
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
public class IndexOperator {
private static ILog logger = LogManager.getLogger(IndexOperator.class);
private final String INDEX_NAME = "skywalking";
private final String INDEX_TYPE = "index";
private TransportClient client;
public IndexOperator(TransportClient client) {
this.client = client;
}
public int batchUpdate(IndexMetaCollection metaInfos) {
BulkRequestBuilder requestBuilder = client.prepareBulk();
for (IndexMetaInfo indexMetaInfo : metaInfos) {
try {
requestBuilder.add(client.prepareIndex(INDEX_NAME, INDEX_TYPE).setSource(buildSource(indexMetaInfo)));
} catch (Exception e) {
logger.error("Failed to update index.", e);
HealthCollector.getCurrentHeathReading("IndexOperator")
.updateData(HeathReading.ERROR, "Failed to " + "update index.");
}
}
BulkResponse bulkRequest = requestBuilder.get();
if (bulkRequest.hasFailures()) {
HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR,
"Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage());
}
return metaInfos.size();
}
private XContentBuilder buildSource(IndexMetaInfo indexMetaInfo) throws IOException {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("traceid_s0", indexMetaInfo.getTraceId()[0])
.field("traceid_s1", indexMetaInfo.getTraceId()[1]).field("traceid_s2", indexMetaInfo.getTraceId()[2])
.field("traceid_s3", indexMetaInfo.getTraceId()[3]).field("traceid_s4", indexMetaInfo.getTraceId()[4])
.field("traceid_s5", indexMetaInfo.getTraceId()[5])
.field("span_type", indexMetaInfo.getSpanType().getValue())
.field("fileName", indexMetaInfo.getFileName().getName())
.field("fileName_suffix", indexMetaInfo.getFileName().getSuffix())
.field("offset", indexMetaInfo.getOffset()).field("length", indexMetaInfo.getLength()).endObject();
return xContentBuilder;
}
public IndexMetaCollection findIndex(Long[] traceId) {
int index = 0;
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
for (Long traceIdSegment : traceId) {
queryBuilder.must(termQuery("traceid_s" + index++, traceIdSegment));
}
IndexMetaCollection collection = new IndexMetaCollection();
SearchResponse response =
client.prepareSearch(INDEX_NAME).setTypes(INDEX_TYPE).setQuery(queryBuilder).execute().actionGet();
for (SearchHit hit : response.getHits()) {
DataFileNameDesc desc = new DataFileNameDesc(Long.parseLong(hit.getSource().get("fileName").toString()),
Integer.parseInt(hit.getSource().get("fileName_suffix").toString()));
int length = Integer.parseInt(hit.getSource().get("length").toString());
long offset = Long.parseLong(hit.getSource().get("offset").toString());
SpanType spanType = SpanType.convert(Integer.parseInt(hit.getSource().get("span_type").toString()));
collection.add(new IndexMetaInfo(desc, offset, length, spanType));
}
return collection;
}
public void close() {
client.close();
}
IndexMetaCollection findIndex(Long[] traceId);
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.data.file.DataFileNameDesc;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexMetaInfo;
import com.a.eye.skywalking.storage.data.spandata.SpanType;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
public class IndexOperatorImpl implements IndexOperator {
private static ILog logger = LogManager.getLogger(IndexOperatorImpl.class);
private TransportClient client;
public IndexOperatorImpl(TransportClient client) {
this.client = client;
}
@Override
public int batchUpdate(IndexMetaCollection metaInfos) {
BulkRequestBuilder requestBuilder = client.prepareBulk();
for (IndexMetaInfo indexMetaInfo : metaInfos) {
try {
requestBuilder.add(client.prepareIndex("skywalking", "index").setSource(buildSource(indexMetaInfo)));
} catch (Exception e) {
logger.error("Failed to update index.", e);
HealthCollector.getCurrentHeathReading("IndexOperator")
.updateData(HeathReading.ERROR, "Failed to " + "update index.");
}
}
BulkResponse bulkRequest = requestBuilder.get();
if (bulkRequest.hasFailures()) {
HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR,
"Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage());
}
return metaInfos.size();
}
private XContentBuilder buildSource(IndexMetaInfo indexMetaInfo) throws IOException {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("traceid_s0", indexMetaInfo.getTraceId()[0])
.field("traceid_s1", indexMetaInfo.getTraceId()[1]).field("traceid_s2", indexMetaInfo.getTraceId()[2])
.field("traceid_s3", indexMetaInfo.getTraceId()[3]).field("traceid_s4", indexMetaInfo.getTraceId()[4])
.field("traceid_s5", indexMetaInfo.getTraceId()[5])
.field("span_type", indexMetaInfo.getSpanType().getValue())
.field("fileName", indexMetaInfo.getFileName().getName())
.field("fileName_suffix", indexMetaInfo.getFileName().getSuffix())
.field("offset", indexMetaInfo.getOffset()).field("length", indexMetaInfo.getLength()).endObject();
return xContentBuilder;
}
@Override
public IndexMetaCollection findIndex(Long[] traceId) {
int index = 0;
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
for (Long traceIdSegment : traceId) {
queryBuilder.must(termQuery("traceid_s" + index++, traceIdSegment));
}
IndexMetaCollection collection = new IndexMetaCollection();
SearchResponse response = client.prepareSearch("skywalking").setQuery(queryBuilder).execute().actionGet();
for (SearchHit hit : response.getHits()) {
DataFileNameDesc desc = new DataFileNameDesc(Long.parseLong(hit.getSource().get("fileName").toString()),
Integer.parseInt(hit.getSource().get("fileName_suffix").toString()));
int length = Integer.parseInt(hit.getSource().get("length").toString());
long offset = Long.parseLong(hit.getSource().get("offset").toString());
SpanType spanType = SpanType.convert(Integer.parseInt(hit.getSource().get("span_type").toString()));
collection.add(new IndexMetaInfo(desc, offset, length, spanType));
}
return collection;
}
}
package com.a.eye.skywalking.storage.data.index.operator.pool;
package com.a.eye.skywalking.storage.data.index.operator;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.transport.TransportClient;
public class IndexOperatorPool extends GenericObjectPool<TransportClient> {
public class IndexOperatorPool extends GenericObjectPool<IndexOperator> {
public IndexOperatorPool(GenericObjectPoolConfig poolConfig) {
super(new IndexOperatorPooledObjectFactory(), poolConfig);
}
......
package com.a.eye.skywalking.storage.data.index.operator.pool;
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.config.Config;
import org.apache.commons.pool2.BasePooledObjectFactory;
......@@ -11,21 +11,20 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
public class IndexOperatorPooledObjectFactory extends BasePooledObjectFactory<TransportClient> {
public class IndexOperatorPooledObjectFactory extends BasePooledObjectFactory<IndexOperator> {
@Override
public TransportClient create() throws Exception {
return new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT));
public IndexOperator create() throws Exception {
return OperatorFactory.createIndexOperator();
}
@Override
public PooledObject<TransportClient> wrap(TransportClient client) {
public PooledObject<IndexOperator> wrap(IndexOperator client) {
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<TransportClient> p) throws Exception {
super.destroyObject(p);
public void destroyObject(PooledObject<IndexOperator> p) throws Exception {
p.getObject().close();
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.exception.IndexOperatorBorrowFailedException;
import com.a.eye.skywalking.storage.data.exception.IndexOperatorInitializeFailedException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
......@@ -10,13 +12,34 @@ import java.net.InetAddress;
public class OperatorFactory {
private static IndexOperatorPool pool;
public static IndexOperator createIndexOperator() {
try {
return new IndexOperatorImpl(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
return new IndexOperator(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getLocalHost(), Config.DataIndex.INDEX_LISTEN_PORT)));
} catch (Exception e) {
throw new IndexOperatorInitializeFailedException("Failed to initialize operator.", e);
}
}
public static IndexOperator getIndexOperatorFromPool() {
try {
return pool.borrowObject();
} catch (Exception e) {
throw new IndexOperatorBorrowFailedException();
}
}
public static void returnIndexOperator(IndexOperator indexOperator) {
pool.returnObject(indexOperator);
}
public static void initOperatorPool() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(Config.IndexOperator.Finder.TOTAL);
config.setMaxIdle(Config.IndexOperator.Finder.IDEL);
pool = new IndexOperatorPool(config);
}
}
package com.a.eye.skywalking.storage.data.index.operator;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
public class UpdateExecutor implements Executor<Integer> {
private IndexMetaCollection metaCollection;
public UpdateExecutor(IndexMetaCollection metaCollection) {
this.metaCollection = metaCollection;
}
@Override
public Integer execute(IndexOperator indexOperator) {
return indexOperator.batchUpdate(metaCollection);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册