diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/DisruptorQueueCreator.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/DisruptorQueueCreator.java index 8cdca483bd5ae725bbe06071784c58b64d715b8c..320a22206b51572349bcdab4b81110074070b4e2 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/DisruptorQueueCreator.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/DisruptorQueueCreator.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.queue.disruptor; +import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import org.skywalking.apm.collector.core.queue.DaemonThreadFactory; @@ -25,12 +26,16 @@ import org.skywalking.apm.collector.core.queue.MessageHolder; import org.skywalking.apm.collector.core.queue.QueueCreator; import org.skywalking.apm.collector.core.queue.QueueEventHandler; import org.skywalking.apm.collector.core.queue.QueueExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class DisruptorQueueCreator implements QueueCreator { + private final Logger logger = LoggerFactory.getLogger(DisruptorQueueCreator.class); + @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { // Specify the size of the ring buffer, must be power of 2. if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) { @@ -40,6 +45,20 @@ public class DisruptorQueueCreator implements QueueCreator { // Construct the Disruptor Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); + disruptor.setDefaultExceptionHandler(new ExceptionHandler() { + @Override public void handleEventException(Throwable ex, long sequence, MessageHolder event) { + logger.error("handler message error! message: {}.", event.getMessage(), ex); + } + + @Override public void handleOnStartException(Throwable ex) { + logger.error("create disruptor failed!", ex); + } + + @Override public void handleOnShutdownException(Throwable ex) { + logger.error("shutdown disruptor failed!", ex); + } + }); + RingBuffer ringBuffer = disruptor.getRingBuffer(); DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);