diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/config/Config.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/config/Config.java index bd77768a0b1a2f63f2390333764d398e8f17538b..9b0807752d3b1ee18a457023e06cbcfd601fca70 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/config/Config.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/config/Config.java @@ -17,6 +17,8 @@ public class Config { } public static class Disruptor { - public static int BUFFER_SIZE = 2 ^ 10; + public static int BUFFER_SIZE = 1024 * 128 * 4; + + public static int FLUSH_SIZE = 100; } } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanEventHandler.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractRouteSpanEventHandler.java similarity index 56% rename from skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanEventHandler.java rename to skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractRouteSpanEventHandler.java index 7fae53a449d977e9e5a1389f4650be0ee412ef6d..43cf46a6825f5091edb111b86dd1952aa345b600 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanEventHandler.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractRouteSpanEventHandler.java @@ -3,20 +3,26 @@ package com.a.eye.skywalking.routing.disruptor; import com.a.eye.skywalking.health.report.HealthCollector; import com.a.eye.skywalking.health.report.HeathReading; import com.a.eye.skywalking.network.Client; +import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; import com.a.eye.skywalking.network.listener.client.StorageClientListener; +import com.a.eye.skywalking.routing.config.Config; import com.lmax.disruptor.EventHandler; +import java.util.ArrayList; +import java.util.List; + /** * Created by xin on 2016/11/29. */ -public abstract class AbstractSpanEventHandler implements EventHandler { - protected Client client; - protected int bufferSize = 100; - protected boolean stop; - protected volatile boolean previousSendResult = true; +public abstract class AbstractRouteSpanEventHandler implements EventHandler { + protected Client client; + protected final int bufferSize; + protected boolean stop; + private volatile boolean previousSendFinish = true; - public AbstractSpanEventHandler(String connectionURL) { + public AbstractRouteSpanEventHandler(String connectionURL) { + bufferSize = Config.Disruptor.FLUSH_SIZE; String[] urlSegment = connectionURL.split(":"); if (urlSegment.length != 2) { throw new IllegalArgumentException(); @@ -28,18 +34,16 @@ public abstract class AbstractSpanEventHandler implements EventHandler { SpanStorageClient spanStorageClient = client.newSpanStorageClient(new StorageClientListener() { @Override public void onError(Throwable throwable) { - HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.ERROR, - "Failed to send span. error message :" + throwable.getMessage()); + HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.ERROR, "Failed to send span. error message :" + throwable.getMessage()); } @Override public void onBatchFinished() { - previousSendResult = true; - HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO, - " consumed Successfully"); + previousSendFinish = true; + HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO, " consumed Successfully"); } }); - previousSendResult = false; + previousSendFinish = false; return spanStorageClient; } @@ -49,4 +53,13 @@ public abstract class AbstractSpanEventHandler implements EventHandler { public void stop() { stop = true; } + + public void wait2Finish() { + while (!previousSendFinish) { + try { + Thread.sleep(1L); + } catch (InterruptedException e) { + } + } + } } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanBufferEventHandler.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanBufferEventHandler.java deleted file mode 100644 index a6c0f4d991a2638ba8f7f9276319b55f471ccbf7..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanBufferEventHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.a.eye.skywalking.routing.disruptor.ack; - -import com.a.eye.skywalking.network.grpc.AckSpan; -import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; -import com.a.eye.skywalking.routing.router.RoutingService; -import com.a.eye.skywalking.routing.disruptor.SpanDisruptor; -import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler; - -import java.util.ArrayList; -import java.util.List; - -public class AckSpanBufferEventHandler extends AbstractSpanEventHandler { - private List buffer = new ArrayList<>(bufferSize); - - public AckSpanBufferEventHandler(String connectionURl) { - super(connectionURl); - } - - @Override - public String getExtraId() { - return "AckSpanEventHandler"; - } - - @Override - public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception { - buffer.add(event.getAckSpan()); - - if (stop){ - try { - for (AckSpan ackSpan : buffer) { - SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan); - spanDisruptor.saveSpan(ackSpan); - } - }finally { - buffer.clear(); - } - - return ; - } - - while (!previousSendResult){ - try { - Thread.sleep(10L); - }catch (InterruptedException e){ - } - } - - if (endOfBatch || buffer.size() == bufferSize) { - SpanStorageClient spanStorageClient = getStorageClient(); - spanStorageClient.sendACKSpan(buffer); - buffer.clear(); - } - } -} diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java index 35ff402bb95ed88b589bb90a49495e784b78b54f..a110d7c739be79e70136f31f6afd68f7b6fc9b82 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java @@ -18,11 +18,11 @@ public class AckSpanDisruptor { private Disruptor ackSpanDisruptor; private RingBuffer ackSpanRingBuffer; - private AckSpanBufferEventHandler ackSpanEventHandler; + private RouteAckSpanBufferEventHandler ackSpanEventHandler; public AckSpanDisruptor(String connectionURL) { ackSpanDisruptor = new Disruptor(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); - ackSpanEventHandler = new AckSpanBufferEventHandler(connectionURL); + ackSpanEventHandler = new RouteAckSpanBufferEventHandler(connectionURL); ackSpanDisruptor.handleEventsWith(ackSpanEventHandler); ackSpanDisruptor.start(); ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer(); diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/RouteAckSpanBufferEventHandler.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/RouteAckSpanBufferEventHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..f2c532915880735892aa2ac21fb1aa4103b65a4a --- /dev/null +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/RouteAckSpanBufferEventHandler.java @@ -0,0 +1,62 @@ +package com.a.eye.skywalking.routing.disruptor.ack; + +import com.a.eye.skywalking.health.report.HealthCollector; +import com.a.eye.skywalking.health.report.HeathReading; +import com.a.eye.skywalking.logging.api.ILog; +import com.a.eye.skywalking.logging.api.LogManager; +import com.a.eye.skywalking.network.grpc.AckSpan; +import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; +import com.a.eye.skywalking.routing.disruptor.AbstractRouteSpanEventHandler; +import com.a.eye.skywalking.routing.disruptor.SpanDisruptor; +import com.a.eye.skywalking.routing.router.RoutingService; + +import java.util.ArrayList; +import java.util.List; + +public class RouteAckSpanBufferEventHandler extends AbstractRouteSpanEventHandler { + private static ILog logger = LogManager.getLogger(RouteAckSpanBufferEventHandler.class); + private final List buffer; + + public RouteAckSpanBufferEventHandler(String connectionURl) { + super(connectionURl); + buffer = new ArrayList<>(bufferSize); + } + + @Override + public String getExtraId() { + return "AckSpanEventHandler"; + } + + @Override + public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception { + buffer.add(event.getAckSpan()); + + if (stop) { + try { + for (AckSpan ackSpan : buffer) { + SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan); + spanDisruptor.saveSpan(ackSpan); + } + } finally { + buffer.clear(); + } + + return; + } + + wait2Finish(); + + if (endOfBatch || buffer.size() == bufferSize) { + try { + SpanStorageClient spanStorageClient = getStorageClient(); + spanStorageClient.sendACKSpan(buffer); + HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size()); + } catch (Throwable e) { + logger.error("Ack messages consume failure.", e); + HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size()); + } finally { + buffer.clear(); + } + } + } +} diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java index ed2cebbdb7a805f115525be915dbba8687d4ef0e..c33f6ae6bd47b2ab768c598d99aba7c6c16c952b 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java @@ -15,13 +15,13 @@ import com.lmax.disruptor.util.DaemonThreadFactory; */ public class RequestSpanDisruptor { private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class); - private Disruptor requestSpanDisruptor; - private RingBuffer requestSpanRingBuffer; - private SendRequestSpanEventHandler eventHandler; + private Disruptor requestSpanDisruptor; + private RingBuffer requestSpanRingBuffer; + private RouteSendRequestSpanEventHandler eventHandler; public RequestSpanDisruptor(String connectionURL) { requestSpanDisruptor = new Disruptor(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); - eventHandler = new SendRequestSpanEventHandler(connectionURL); + eventHandler = new RouteSendRequestSpanEventHandler(connectionURL); requestSpanDisruptor.handleEventsWith(eventHandler); requestSpanDisruptor.start(); requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer(); diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RouteSendRequestSpanEventHandler.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RouteSendRequestSpanEventHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..9f238a418c7da21185e0236d6f57fb33d61312a9 --- /dev/null +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RouteSendRequestSpanEventHandler.java @@ -0,0 +1,66 @@ +package com.a.eye.skywalking.routing.disruptor.request; + + +import com.a.eye.skywalking.health.report.HealthCollector; +import com.a.eye.skywalking.health.report.HeathReading; +import com.a.eye.skywalking.logging.api.ILog; +import com.a.eye.skywalking.logging.api.LogManager; +import com.a.eye.skywalking.network.grpc.RequestSpan; +import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; +import com.a.eye.skywalking.routing.disruptor.SpanDisruptor; +import com.a.eye.skywalking.routing.router.RoutingService; +import com.a.eye.skywalking.routing.disruptor.AbstractRouteSpanEventHandler; + +import java.util.ArrayList; +import java.util.List; + +/** + * Created by xin on 2016/11/27. + */ +public class RouteSendRequestSpanEventHandler extends AbstractRouteSpanEventHandler { + private static ILog logger = LogManager.getLogger(RouteSendRequestSpanEventHandler.class); + private final List buffer; + + public RouteSendRequestSpanEventHandler(String connectionURl) { + super(connectionURl); + buffer = new ArrayList<>(bufferSize); + } + + @Override + public String getExtraId() { + return "RequestSpanEventHandler"; + } + + @Override + public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception { + buffer.add(event.getRequestSpan()); + + if (stop) { + try { + for (RequestSpan ackSpan : buffer) { + SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan); + spanDisruptor.saveSpan(ackSpan); + } + } finally { + buffer.clear(); + } + + return; + } + + wait2Finish(); + + if (endOfBatch || buffer.size() == bufferSize) { + try { + SpanStorageClient spanStorageClient = getStorageClient(); + spanStorageClient.sendRequestSpan(buffer); + HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size()); + } catch (Throwable e) { + logger.error("RequestSpan messages consume failure.", e); + HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size()); + } finally { + buffer.clear(); + } + } + } +} diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/SendRequestSpanEventHandler.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/SendRequestSpanEventHandler.java deleted file mode 100644 index c86652d4b9f3e33cf2587f5fd6df003317fb13d7..0000000000000000000000000000000000000000 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/SendRequestSpanEventHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.a.eye.skywalking.routing.disruptor.request; - - -import com.a.eye.skywalking.network.grpc.RequestSpan; -import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; -import com.a.eye.skywalking.routing.disruptor.SpanDisruptor; -import com.a.eye.skywalking.routing.router.RoutingService; -import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler; - -import java.util.ArrayList; -import java.util.List; - -/** - * Created by xin on 2016/11/27. - */ -public class SendRequestSpanEventHandler extends AbstractSpanEventHandler { - private List buffer = new ArrayList<>(bufferSize); - - public SendRequestSpanEventHandler(String connectionURl) { - super(connectionURl); - } - - @Override - public String getExtraId() { - return "RequestSpanEventHandler"; - } - - @Override - public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception { - buffer.add(event.getRequestSpan()); - - if (stop) { - try { - for (RequestSpan ackSpan : buffer) { - SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan); - spanDisruptor.saveSpan(ackSpan); - } - } finally { - buffer.clear(); - } - - return; - } - - while (!previousSendResult) { - try { - Thread.sleep(10L); - } catch (InterruptedException e) { - } - } - - if (endOfBatch || buffer.size() == bufferSize) { - SpanStorageClient spanStorageClient = getStorageClient(); - spanStorageClient.sendRequestSpan(buffer); - buffer.clear(); - } - } -} diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java index 1459e26b670cd77936c6ad342683fe7c54574fb2..2d3e91fb184bf00e37f85ae1254379503810dccf 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java @@ -76,6 +76,7 @@ public class Router implements NodeChangesListener { } } }); + //TODO: BUG, no data release. disruptors = newDisruptors.toArray(new SpanDisruptor[newDisruptors.size()]); } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/storage/listener/NotifyListenerImpl.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/storage/listener/NotifyListenerImpl.java index 6867d044602ae583bffd8d37bbf59e215420c932..d7c9c417740a270b81fdd2f13ddbcade2ee5ded7 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/storage/listener/NotifyListenerImpl.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/storage/listener/NotifyListenerImpl.java @@ -39,6 +39,7 @@ public class NotifyListenerImpl implements NotifyListener { public void notify(List currentUrls) { lock.lock(); try { + //TODO: bug, logic error. List URL = new ArrayList<>(currentUrls); if (childrenConnectionURLOfPreviousChanged.size() > URL.size()) { childrenConnectionURLOfPreviousChanged.removeAll(URL); diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java index b92d7eed5acae2d04cc8c17e31a99b01c25fabfa..f911e39e1e6f9889f56a71626e669e6841627a1f 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java @@ -10,6 +10,8 @@ public class Config { public static class Disruptor{ public static int BUFFER_SIZE = 1024 * 128; + + public static int FLUSH_SIZE = 100; } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java index c632d670794f75b98a6a16d3f10f9291b2daaed2..daf030082d52b3fde5051e936928253e05af178d 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperator.java @@ -40,14 +40,14 @@ public class IndexOperator { } catch (Exception e) { logger.error("Failed to update index.", e); HealthCollector.getCurrentHeathReading("IndexOperator") - .updateData(HeathReading.ERROR, "Failed to " + "update index."); + .updateData(HeathReading.ERROR, "Failed to update index."); } } BulkResponse bulkRequest = requestBuilder.get(); if (bulkRequest.hasFailures()) { HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR, - "Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage()); + "Failed to update index. Error message : " + bulkRequest.buildFailureMessage()); } return metaInfos.size(); diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java index 4f361b404da2e373906633feb6148b35d56460e8..a1496a15d147066a4c58d439906eb6d82243e38b 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java @@ -4,6 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector; import com.a.eye.skywalking.health.report.HeathReading; import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; +import com.a.eye.skywalking.storage.config.Config; import com.a.eye.skywalking.storage.data.file.DataFileWriter; import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; import com.a.eye.skywalking.storage.data.index.IndexOperator; @@ -22,10 +23,12 @@ public class StoreAckSpanEventHandler implements EventHandler { private static ILog logger = LogManager.getLogger(StoreAckSpanEventHandler.class); private DataFileWriter fileWriter; private IndexOperator operator; - private int bufferSize = 100; - private List buffer = new ArrayList<>(bufferSize); + private int bufferSize; + private List buffer; public StoreAckSpanEventHandler() { + bufferSize = Config.Disruptor.FLUSH_SIZE; + buffer = new ArrayList<>(bufferSize); fileWriter = new DataFileWriter(); operator = IndexOperatorFactory.createIndexOperator(); } @@ -39,8 +42,10 @@ public class StoreAckSpanEventHandler implements EventHandler { IndexMetaCollection collection = fileWriter.write(buffer); operator.batchUpdate(collection); - - HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size()); + HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size()); + } catch (Throwable e) { + logger.error("Ack messages consume failure.", e); + HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size()); } finally { buffer.clear(); } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java index 3b7371c042ce6aa717fe20bfb579cdc57625420c..be4d69c2b42b8de8e26732a29efbddf6f25d0fcc 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java @@ -4,6 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector; import com.a.eye.skywalking.health.report.HeathReading; import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; +import com.a.eye.skywalking.storage.config.Config; import com.a.eye.skywalking.storage.data.file.DataFileWriter; import com.a.eye.skywalking.storage.data.index.IndexMetaCollection; import com.a.eye.skywalking.storage.data.index.IndexOperator; @@ -22,10 +23,12 @@ public class StoreRequestSpanEventHandler implements EventHandler buffer = new ArrayList<>(bufferSize); + private int bufferSize; + private List buffer; public StoreRequestSpanEventHandler() { + bufferSize = Config.Disruptor.FLUSH_SIZE; + buffer = new ArrayList<>(bufferSize); fileWriter = new DataFileWriter(); operator = IndexOperatorFactory.createIndexOperator(); } @@ -40,7 +43,10 @@ public class StoreRequestSpanEventHandler implements EventHandler