提交 e5c9e295 编写于 作者: P peng-yongsheng

Distruptor queue exits unexpectedly, collector hangs up

上级 6dd6e9c4
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.queue.disruptor.base; package org.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import org.skywalking.apm.collector.queue.base.DaemonThreadFactory; import org.skywalking.apm.collector.queue.base.DaemonThreadFactory;
...@@ -25,12 +26,16 @@ import org.skywalking.apm.collector.queue.base.MessageHolder; ...@@ -25,12 +26,16 @@ import org.skywalking.apm.collector.queue.base.MessageHolder;
import org.skywalking.apm.collector.queue.base.QueueCreator; import org.skywalking.apm.collector.queue.base.QueueCreator;
import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class DisruptorQueueCreator implements QueueCreator { public class DisruptorQueueCreator implements QueueCreator {
private final Logger logger = LoggerFactory.getLogger(DisruptorQueueCreator.class);
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) { @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
// Specify the size of the ring buffer, must be power of 2. // Specify the size of the ring buffer, must be power of 2.
if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) { if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
...@@ -39,6 +44,19 @@ public class DisruptorQueueCreator implements QueueCreator { ...@@ -39,6 +44,19 @@ public class DisruptorQueueCreator implements QueueCreator {
// Construct the Disruptor // Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor<>(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); Disruptor<MessageHolder> disruptor = new Disruptor<>(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);
disruptor.setDefaultExceptionHandler(new ExceptionHandler<MessageHolder>() {
@Override public void handleEventException(Throwable ex, long sequence, MessageHolder event) {
logger.error("Handle disruptor error event! message: {}.", event.getMessage(), ex);
}
@Override public void handleOnStartException(Throwable ex) {
logger.error("create disruptor queue failed!", ex);
}
@Override public void handleOnShutdownException(Throwable ex) {
logger.error("shutdown disruptor queue failed!", ex);
}
});
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer(); RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor); DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册