提交 9561a50f 编写于 作者: wu-sheng's avatar wu-sheng

use disruptor to replace DataCarrier, to improve performance

上级 c30a4234
......@@ -25,9 +25,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>data-carrier</artifactId>
<version>1.2</version>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
......
......@@ -8,6 +8,10 @@ public class Config {
public static int PORT = 34000;
}
public static class Disruptor{
public static int BUFFER_SIZE = 1024 * 128;
}
public static class DataConsumer {
......
package com.a.eye.skywalking.storage.data.file;
import com.a.eye.datacarrier.common.AtomicRangeInteger;
import com.a.eye.skywalking.storage.util.AtomicRangeInteger;
import java.text.ParseException;
import java.text.SimpleDateFormat;
......
......@@ -23,11 +23,14 @@ public class DataFileWriter {
for (SpanData data : spanData) {
collections.add(dataFile.write(data));
}
dataFile.flush();
return collections;
}
public void flush(){
dataFile.flush();
}
public void close(){
dataFile.close();
}
......
......@@ -16,6 +16,10 @@ public class AckSpanData extends AbstractSpanData {
public AckSpanData() {
}
public void setAckSpan(AckSpan ackSpan) {
this.ackSpan = ackSpan;
}
@Override
public SpanType getSpanType() {
return SpanType.ACKSpan;
......
......@@ -16,6 +16,10 @@ public class RequestSpanData extends AbstractSpanData {
public RequestSpanData() {
}
public void setRequestSpan(RequestSpan requestSpan) {
this.requestSpan = requestSpan;
}
@Override
public SpanType getSpanType() {
return SpanType.RequestSpan;
......
package com.a.eye.skywalking.storage.disruptor.ack;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.lmax.disruptor.EventFactory;
/**
* Created by wusheng on 2016/11/24.
*/
public class AckSpanFactory implements EventFactory<AckSpanData> {
@Override
public AckSpanData newInstance() {
return new AckSpanData();
}
}
package com.a.eye.skywalking.storage.data;
package com.a.eye.skywalking.storage.disruptor.ack;
import com.a.eye.datacarrier.consumer.IConsumer;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
......@@ -9,41 +8,38 @@ import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexOperator;
import com.a.eye.skywalking.storage.data.index.IndexOperatorFactory;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.List;
public class SpanDataConsumer implements IConsumer<SpanData> {
private static ILog logger = LogManager.getLogger(SpanDataConsumer.class);
/**
* Created by wusheng on 2016/11/24.
*/
public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
private static ILog logger = LogManager.getLogger(StoreAckSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize = 100;
private List<SpanData> buffer = new ArrayList<>(bufferSize);
@Override
public void init() {
public StoreAckSpanEventHandler() {
fileWriter = new DataFileWriter();
operator = IndexOperatorFactory.createIndexOperator();
}
@Override
public void consume(List<SpanData> data) {
IndexMetaCollection collection = fileWriter.write(data);
public void onEvent(AckSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
operator.batchUpdate(collection);
if (endOfBatch || buffer.size() == bufferSize) {
IndexMetaCollection collection = fileWriter.write(buffer);
HealthCollector.getCurrentHeathReading("SpanDataConsumer")
.updateData(HeathReading.INFO, "%s messages were successful consumed .", data.size());
}
operator.batchUpdate(collection);
@Override
public void onError(List<SpanData> span, Throwable throwable) {
logger.error("Failed to consumer span data.", throwable);
HealthCollector.getCurrentHeathReading("SpanDataConsumer").updateData(HeathReading.ERROR,
"Failed to consume span data. error message : " + throwable.getMessage());
}
@Override
public void onExit() {
fileWriter.close();
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
}
}
}
package com.a.eye.skywalking.storage.disruptor.request;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
import com.lmax.disruptor.EventFactory;
/**
* Created by wusheng on 2016/11/24.
*/
public class RequestSpanFactory implements EventFactory<RequestSpanData> {
@Override
public RequestSpanData newInstance() {
return new RequestSpanData();
}
}
package com.a.eye.skywalking.storage.disruptor.request;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexOperator;
import com.a.eye.skywalking.storage.data.index.IndexOperatorFactory;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.List;
/**
* Created by wusheng on 2016/11/24.
*/
public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanData> {
private static ILog logger = LogManager.getLogger(StoreRequestSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize = 100;
private List<SpanData> buffer = new ArrayList<>(bufferSize);
public StoreRequestSpanEventHandler() {
fileWriter = new DataFileWriter();
operator = IndexOperatorFactory.createIndexOperator();
}
@Override
public void onEvent(RequestSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
}
}
}
package com.a.eye.skywalking.storage.listener;
import com.a.eye.datacarrier.DataCarrier;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
......@@ -9,32 +8,48 @@ import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.listener.SpanStorageListener;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.SpanDataConsumer;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanDataBuilder;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
import com.a.eye.skywalking.storage.disruptor.ack.AckSpanFactory;
import com.a.eye.skywalking.storage.disruptor.ack.StoreAckSpanEventHandler;
import com.a.eye.skywalking.storage.disruptor.request.RequestSpanFactory;
import com.a.eye.skywalking.storage.disruptor.request.StoreRequestSpanEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
public class StorageListener implements SpanStorageListener {
private ILog logger = LogManager.getLogger(StorageListener.class);
private DataCarrier<SpanData> spanDataDataCarrier;
private Disruptor<RequestSpanData> requestSpanDisruptor;
private RingBuffer<RequestSpanData> requestSpanRingBuffer;
private Disruptor<AckSpanData> ackSpanDisruptor;
private RingBuffer<AckSpanData> ackSpanRingBuffer;
public StorageListener() {
spanDataDataCarrier = new DataCarrier<>(Config.DataConsumer.CHANNEL_SIZE, Config.DataConsumer.BUFFER_SIZE);
spanDataDataCarrier.consume(SpanDataConsumer.class, Config.DataConsumer.CONSUMER_SIZE);
requestSpanDisruptor = new Disruptor<RequestSpanData>(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
requestSpanDisruptor.handleEventsWith(new StoreRequestSpanEventHandler());
requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer();
ackSpanDisruptor = new Disruptor<AckSpanData>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanDisruptor.handleEventsWith(new StoreAckSpanEventHandler());
ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer();
}
@Override
public boolean storage(RequestSpan requestSpan) {
try {
spanDataDataCarrier.produce(SpanDataBuilder.build(requestSpan));
HealthCollector.getCurrentHeathReading("StorageListener")
.updateData(HeathReading.INFO, "RequestSpan stored.");
long sequence = requestSpanRingBuffer.next(); // Grab the next sequence
RequestSpanData data = requestSpanRingBuffer.get(sequence);
data.setRequestSpan(requestSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "RequestSpan stored.");
return true;
} catch (Exception e) {
logger.error("RequestSpan trace-id[{}] store failure..", requestSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener")
.updateData(HeathReading.ERROR, "RequestSpan store failure.");
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "RequestSpan store failure.");
return false;
}
}
......@@ -42,13 +57,15 @@ public class StorageListener implements SpanStorageListener {
@Override
public boolean storage(AckSpan ackSpan) {
try {
spanDataDataCarrier.produce(SpanDataBuilder.build(ackSpan));
long sequence = ackSpanRingBuffer.next(); // Grab the next sequence
AckSpanData data = ackSpanRingBuffer.get(sequence);
data.setAckSpan(ackSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "AckSpan stored.");
return true;
} catch (Exception e) {
logger.error("AckSpan trace-id[{}] store failure..", ackSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener")
.updateData(HeathReading.ERROR, "AckSpan store failure.");
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "AckSpan store failure.");
return false;
}
}
......
package com.a.eye.skywalking.storage.util;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wusheng on 2016/10/25.
*/
public class AtomicRangeInteger extends Number implements Serializable {
private static final long serialVersionUID = -4099792402691141643L;
private AtomicInteger value;
private int startValue;
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
this.value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
public final int getAndIncrement() {
int current;
int next;
do {
current = this.value.get();
next = current >= this.endValue?this.startValue:current + 1;
} while(!this.value.compareAndSet(current, next));
return current;
}
public final int get() {
return this.value.get();
}
public int intValue() {
return this.value.intValue();
}
public long longValue() {
return this.value.longValue();
}
public float floatValue() {
return this.value.floatValue();
}
public double doubleValue() {
return this.value.doubleValue();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册