提交 97bd55f3 编写于 作者: wu-sheng's avatar wu-sheng

1.add log and health-report. 2.rename classes. 3. review for bugs. 4. fix some bugs

上级 1fe65792
......@@ -17,6 +17,8 @@ public class Config {
}
public static class Disruptor {
public static int BUFFER_SIZE = 2 ^ 10;
public static int BUFFER_SIZE = 1024 * 128 * 4;
public static int FLUSH_SIZE = 100;
}
}
......@@ -3,20 +3,26 @@ package com.a.eye.skywalking.routing.disruptor;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import com.a.eye.skywalking.routing.config.Config;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.List;
/**
* Created by xin on 2016/11/29.
*/
public abstract class AbstractSpanEventHandler<T> implements EventHandler<T> {
protected Client client;
protected int bufferSize = 100;
protected boolean stop;
protected volatile boolean previousSendResult = true;
public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T> {
protected Client client;
protected final int bufferSize;
protected boolean stop;
private volatile boolean previousSendFinish = true;
public AbstractSpanEventHandler(String connectionURL) {
public AbstractRouteSpanEventHandler(String connectionURL) {
bufferSize = Config.Disruptor.FLUSH_SIZE;
String[] urlSegment = connectionURL.split(":");
if (urlSegment.length != 2) {
throw new IllegalArgumentException();
......@@ -28,18 +34,16 @@ public abstract class AbstractSpanEventHandler<T> implements EventHandler<T> {
SpanStorageClient spanStorageClient = client.newSpanStorageClient(new StorageClientListener() {
@Override
public void onError(Throwable throwable) {
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.ERROR,
"Failed to send span. error message :" + throwable.getMessage());
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.ERROR, "Failed to send span. error message :" + throwable.getMessage());
}
@Override
public void onBatchFinished() {
previousSendResult = true;
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO,
" consumed Successfully");
previousSendFinish = true;
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO, " consumed Successfully");
}
});
previousSendResult = false;
previousSendFinish = false;
return spanStorageClient;
}
......@@ -49,4 +53,13 @@ public abstract class AbstractSpanEventHandler<T> implements EventHandler<T> {
public void stop() {
stop = true;
}
public void wait2Finish() {
while (!previousSendFinish) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
}
}
......@@ -18,11 +18,11 @@ public class AckSpanDisruptor {
private Disruptor<AckSpanHolder> ackSpanDisruptor;
private RingBuffer<AckSpanHolder> ackSpanRingBuffer;
private AckSpanBufferEventHandler ackSpanEventHandler;
private RouteAckSpanBufferEventHandler ackSpanEventHandler;
public AckSpanDisruptor(String connectionURL) {
ackSpanDisruptor = new Disruptor<AckSpanHolder>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanEventHandler = new AckSpanBufferEventHandler(connectionURL);
ackSpanEventHandler = new RouteAckSpanBufferEventHandler(connectionURL);
ackSpanDisruptor.handleEventsWith(ackSpanEventHandler);
ackSpanDisruptor.start();
ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer();
......
package com.a.eye.skywalking.routing.disruptor.ack;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.disruptor.AbstractRouteSpanEventHandler;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler;
import com.a.eye.skywalking.routing.router.RoutingService;
import java.util.ArrayList;
import java.util.List;
public class AckSpanBufferEventHandler extends AbstractSpanEventHandler<AckSpanHolder> {
private List<AckSpan> buffer = new ArrayList<>(bufferSize);
public class RouteAckSpanBufferEventHandler extends AbstractRouteSpanEventHandler<AckSpanHolder> {
private static ILog logger = LogManager.getLogger(RouteAckSpanBufferEventHandler.class);
private final List<AckSpan> buffer;
public AckSpanBufferEventHandler(String connectionURl) {
public RouteAckSpanBufferEventHandler(String connectionURl) {
super(connectionURl);
buffer = new ArrayList<>(bufferSize);
}
@Override
......@@ -25,30 +31,32 @@ public class AckSpanBufferEventHandler extends AbstractSpanEventHandler<AckSpanH
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getAckSpan());
if (stop){
if (stop) {
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
}finally {
} finally {
buffer.clear();
}
return ;
return;
}
while (!previousSendResult){
try {
Thread.sleep(10L);
}catch (InterruptedException e){
}
}
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
buffer.clear();
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
}
}
......@@ -15,13 +15,13 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
*/
public class RequestSpanDisruptor {
private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class);
private Disruptor<RequestSpanHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanHolder> requestSpanRingBuffer;
private SendRequestSpanEventHandler eventHandler;
private Disruptor<RequestSpanHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanHolder> requestSpanRingBuffer;
private RouteSendRequestSpanEventHandler eventHandler;
public RequestSpanDisruptor(String connectionURL) {
requestSpanDisruptor = new Disruptor<RequestSpanHolder>(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
eventHandler = new SendRequestSpanEventHandler(connectionURL);
eventHandler = new RouteSendRequestSpanEventHandler(connectionURL);
requestSpanDisruptor.handleEventsWith(eventHandler);
requestSpanDisruptor.start();
requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer();
......
package com.a.eye.skywalking.routing.disruptor.request;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler;
import com.a.eye.skywalking.routing.disruptor.AbstractRouteSpanEventHandler;
import java.util.ArrayList;
import java.util.List;
......@@ -13,11 +17,13 @@ import java.util.List;
/**
* Created by xin on 2016/11/27.
*/
public class SendRequestSpanEventHandler extends AbstractSpanEventHandler<RequestSpanHolder> {
private List<RequestSpan> buffer = new ArrayList<>(bufferSize);
public class RouteSendRequestSpanEventHandler extends AbstractRouteSpanEventHandler<RequestSpanHolder> {
private static ILog logger = LogManager.getLogger(RouteSendRequestSpanEventHandler.class);
private final List<RequestSpan> buffer;
public SendRequestSpanEventHandler(String connectionURl) {
public RouteSendRequestSpanEventHandler(String connectionURl) {
super(connectionURl);
buffer = new ArrayList<>(bufferSize);
}
@Override
......@@ -42,17 +48,19 @@ public class SendRequestSpanEventHandler extends AbstractSpanEventHandler<Reques
return;
}
while (!previousSendResult) {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
}
}
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
buffer.clear();
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("RequestSpan messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
}
}
......@@ -76,6 +76,7 @@ public class Router implements NodeChangesListener {
}
}
});
//TODO: BUG, no data release.
disruptors = newDisruptors.toArray(new SpanDisruptor[newDisruptors.size()]);
}
......
......@@ -39,6 +39,7 @@ public class NotifyListenerImpl implements NotifyListener {
public void notify(List<String> currentUrls) {
lock.lock();
try {
//TODO: bug, logic error.
List<String> URL = new ArrayList<>(currentUrls);
if (childrenConnectionURLOfPreviousChanged.size() > URL.size()) {
childrenConnectionURLOfPreviousChanged.removeAll(URL);
......
......@@ -10,6 +10,8 @@ public class Config {
public static class Disruptor{
public static int BUFFER_SIZE = 1024 * 128;
public static int FLUSH_SIZE = 100;
}
......
......@@ -40,14 +40,14 @@ public class IndexOperator {
} catch (Exception e) {
logger.error("Failed to update index.", e);
HealthCollector.getCurrentHeathReading("IndexOperator")
.updateData(HeathReading.ERROR, "Failed to " + "update index.");
.updateData(HeathReading.ERROR, "Failed to update index.");
}
}
BulkResponse bulkRequest = requestBuilder.get();
if (bulkRequest.hasFailures()) {
HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR,
"Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage());
"Failed to update index. Error message : " + bulkRequest.buildFailureMessage());
}
return metaInfos.size();
......
......@@ -4,6 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexOperator;
......@@ -22,10 +23,12 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
private static ILog logger = LogManager.getLogger(StoreAckSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize = 100;
private List<SpanData> buffer = new ArrayList<>(bufferSize);
private int bufferSize;
private List<SpanData> buffer;
public StoreAckSpanEventHandler() {
bufferSize = Config.Disruptor.FLUSH_SIZE;
buffer = new ArrayList<>(bufferSize);
fileWriter = new DataFileWriter();
operator = IndexOperatorFactory.createIndexOperator();
}
......@@ -39,8 +42,10 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
......
......@@ -4,6 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.storage.config.Config;
import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
import com.a.eye.skywalking.storage.data.index.IndexOperator;
......@@ -22,10 +23,12 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
private static ILog logger = LogManager.getLogger(StoreRequestSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize = 100;
private List<SpanData> buffer = new ArrayList<>(bufferSize);
private int bufferSize;
private List<SpanData> buffer;
public StoreRequestSpanEventHandler() {
bufferSize = Config.Disruptor.FLUSH_SIZE;
buffer = new ArrayList<>(bufferSize);
fileWriter = new DataFileWriter();
operator = IndexOperatorFactory.createIndexOperator();
}
......@@ -40,7 +43,10 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册