提交 43cbb6fa 编写于 作者: A ascrutae

fix some issues

上级 9147fec6
package com.a.eye.skywalking.routing.disruptor.ack;
import com.lmax.disruptor.EventHandler;
/**
* Created by xin on 2017/2/8.
*/
public class AckSpanClearEventHandler implements EventHandler<AckSpanHolder> {
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
event.setAckSpan(null);
}
}
......@@ -26,7 +26,7 @@ public class AckSpanDisruptor extends AbstractSpanDisruptor {
public AckSpanDisruptor(String connectionURL) {
ackSpanDisruptor = new Disruptor<AckSpanHolder>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanEventHandler = new RouteAckSpanBufferEventHandler(connectionURL);
ackSpanDisruptor.handleEventsWith(ackSpanEventHandler, new SpanAlarmHandler());
ackSpanDisruptor.handleEventsWith(ackSpanEventHandler).then(new SpanAlarmHandler()).then(new AckSpanClearEventHandler());
ackSpanDisruptor.start();
ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer();
}
......
package com.a.eye.skywalking.storage.disruptor.ack;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
/**
* @author zhangxin
*/
public class AckSpanDataHolder {
private AckSpanData ackSpanData;
public AckSpanData getAckSpanData() {
return ackSpanData;
}
public void clearData() {
this.ackSpanData = null;
}
public void fillData(AckSpan ackSpan) {
this.ackSpanData = new AckSpanData(ackSpan);
}
}
......@@ -6,9 +6,9 @@ import com.lmax.disruptor.EventFactory;
/**
* Created by wusheng on 2016/11/24.
*/
public class AckSpanFactory implements EventFactory<AckSpanData> {
public class AckSpanFactory implements EventFactory<AckSpanDataHolder> {
@Override
public AckSpanData newInstance() {
return new AckSpanData();
public AckSpanDataHolder newInstance() {
return new AckSpanDataHolder();
}
}
......@@ -11,6 +11,7 @@ import com.a.eye.skywalking.storage.data.index.IndexOperator;
import com.a.eye.skywalking.storage.data.index.IndexOperatorFactory;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.a.eye.skywalking.storage.disruptor.request.RequestSpanDataHolder;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
......@@ -19,7 +20,7 @@ import java.util.List;
/**
* Created by wusheng on 2016/11/24.
*/
public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
public class StoreAckSpanEventHandler implements EventHandler<AckSpanDataHolder> {
private static ILog logger = LogManager.getLogger(StoreAckSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
......@@ -34,21 +35,25 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
}
@Override
public void onEvent(AckSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
public void onEvent(AckSpanDataHolder event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event.getAckSpanData());
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.clearData();
}
}
}
package com.a.eye.skywalking.storage.disruptor.request;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
/**
* @author zhangxin
*/
public class RequestSpanDataHolder {
private RequestSpanData requestSpanData;
public void clearData() {
this.requestSpanData = null;
}
public RequestSpanData getRequestSpanData() {
return requestSpanData;
}
public void fillData(RequestSpan requestSpan) {
requestSpanData = new RequestSpanData(requestSpan);
}
}
......@@ -6,9 +6,9 @@ import com.lmax.disruptor.EventFactory;
/**
* Created by wusheng on 2016/11/24.
*/
public class RequestSpanFactory implements EventFactory<RequestSpanData> {
public class RequestSpanFactory implements EventFactory<RequestSpanDataHolder> {
@Override
public RequestSpanData newInstance() {
return new RequestSpanData();
public RequestSpanDataHolder newInstance() {
return new RequestSpanDataHolder();
}
}
......@@ -19,7 +19,7 @@ import java.util.List;
/**
* Created by wusheng on 2016/11/24.
*/
public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanData> {
public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDataHolder> {
private static ILog logger = LogManager.getLogger(StoreRequestSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
......@@ -34,23 +34,26 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
}
@Override
public void onEvent(RequestSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
public void onEvent(RequestSpanDataHolder event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event.getRequestSpanData());
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.clearData();
}
}
}
......@@ -8,10 +8,10 @@ import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.listener.server.SpanStorageServerListener;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.spandata.AckSpanData;
import com.a.eye.skywalking.storage.data.spandata.RequestSpanData;
import com.a.eye.skywalking.storage.disruptor.ack.AckSpanDataHolder;
import com.a.eye.skywalking.storage.disruptor.ack.AckSpanFactory;
import com.a.eye.skywalking.storage.disruptor.ack.StoreAckSpanEventHandler;
import com.a.eye.skywalking.storage.disruptor.request.RequestSpanDataHolder;
import com.a.eye.skywalking.storage.disruptor.request.RequestSpanFactory;
import com.a.eye.skywalking.storage.disruptor.request.StoreRequestSpanEventHandler;
import com.lmax.disruptor.RingBuffer;
......@@ -22,19 +22,19 @@ public class StorageListener implements SpanStorageServerListener {
private ILog logger = LogManager.getLogger(StorageListener.class);
private Disruptor<RequestSpanData> requestSpanDisruptor;
private RingBuffer<RequestSpanData> requestSpanRingBuffer;
private Disruptor<RequestSpanDataHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanDataHolder> requestSpanRingBuffer;
private Disruptor<AckSpanData> ackSpanDisruptor;
private RingBuffer<AckSpanData> ackSpanRingBuffer;
private Disruptor<AckSpanDataHolder> ackSpanDisruptor;
private RingBuffer<AckSpanDataHolder> ackSpanRingBuffer;
public StorageListener() {
requestSpanDisruptor = new Disruptor<RequestSpanData>(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
requestSpanDisruptor = new Disruptor<RequestSpanDataHolder>(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
requestSpanDisruptor.handleEventsWith(new StoreRequestSpanEventHandler());
requestSpanDisruptor.start();
requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer();
ackSpanDisruptor = new Disruptor<AckSpanData>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanDisruptor = new Disruptor<AckSpanDataHolder>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanDisruptor.handleEventsWith(new StoreAckSpanEventHandler());
ackSpanDisruptor.start();
ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer();
......@@ -44,8 +44,8 @@ public class StorageListener implements SpanStorageServerListener {
public boolean storage(RequestSpan requestSpan) {
long sequence = requestSpanRingBuffer.next(); // Grab the next sequence
try {
RequestSpanData data = requestSpanRingBuffer.get(sequence);
data.setRequestSpan(requestSpan);
RequestSpanDataHolder data = requestSpanRingBuffer.get(sequence);
data.fillData(requestSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "RequestSpan stored.");
return true;
......@@ -62,8 +62,8 @@ public class StorageListener implements SpanStorageServerListener {
public boolean storage(AckSpan ackSpan) {
long sequence = ackSpanRingBuffer.next(); // Grab the next sequence
try {
AckSpanData data = ackSpanRingBuffer.get(sequence);
data.setAckSpan(ackSpan);
AckSpanDataHolder data = ackSpanRingBuffer.get(sequence);
data.fillData(ackSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "AckSpan stored.");
return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册