提交 fdaa3186 编写于 作者: wu-sheng's avatar wu-sheng

add AbstractSpanDisruptor to avoid endless blocking of next()

上级 6fd3ce2d
package com.a.eye.skywalking.routing.disruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
/**
* Created by wusheng on 2016/12/4.
*/
public class AbstractSpanDisruptor {
private volatile boolean isRunning = true;
public long getRingBufferSequence(RingBuffer buffer){
long sequence;
while(true) {
try {
if(!isRunning){
return -1;
}
sequence = buffer.tryNext();
break;
} catch (InsufficientCapacityException e) {
try {
Thread.sleep(1L);
} catch (InterruptedException e1) {
}
}
}
return sequence;
}
protected boolean isShutdown(){
return !isRunning;
}
protected void shutdown(){
isRunning = false;
}
}
...@@ -7,6 +7,11 @@ import com.a.eye.skywalking.network.grpc.RequestSpan; ...@@ -7,6 +7,11 @@ import com.a.eye.skywalking.network.grpc.RequestSpan;
* Created by xin on 2016/11/29. * Created by xin on 2016/11/29.
*/ */
public class NoopSpanDisruptor extends SpanDisruptor { public class NoopSpanDisruptor extends SpanDisruptor {
public static NoopSpanDisruptor INSTANCE = new NoopSpanDisruptor();
private NoopSpanDisruptor() {
}
@Override @Override
public boolean saveSpan(AckSpan ackSpan) { public boolean saveSpan(AckSpan ackSpan) {
......
...@@ -48,7 +48,7 @@ public class SpanDisruptor { ...@@ -48,7 +48,7 @@ public class SpanDisruptor {
} }
public void shutdown() { public void shutdown() {
ackSpanDisruptor.shutdown(); requestSpanDisruptor.shutdown();
ackSpanDisruptor.shutdown(); ackSpanDisruptor.shutdown();
} }
} }
...@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog; ...@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan; import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.routing.config.Config; import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory; import com.lmax.disruptor.util.DaemonThreadFactory;
...@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory; ...@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
/** /**
* Created by xin on 2016/11/29. * Created by xin on 2016/11/29.
*/ */
public class AckSpanDisruptor { public class AckSpanDisruptor extends AbstractSpanDisruptor {
private static ILog logger = LogManager.getLogger(AckSpanDisruptor.class); private static ILog logger = LogManager.getLogger(AckSpanDisruptor.class);
private Disruptor<AckSpanHolder> ackSpanDisruptor; private Disruptor<AckSpanHolder> ackSpanDisruptor;
private RingBuffer<AckSpanHolder> ackSpanRingBuffer; private RingBuffer<AckSpanHolder> ackSpanRingBuffer;
...@@ -29,7 +31,11 @@ public class AckSpanDisruptor { ...@@ -29,7 +31,11 @@ public class AckSpanDisruptor {
} }
public boolean saveAckSpan(AckSpan ackSpan) { public boolean saveAckSpan(AckSpan ackSpan) {
long sequence = ackSpanRingBuffer.next(); long sequence = this.getRingBufferSequence(ackSpanRingBuffer);
if(this.isShutdown()){
return false;
}
try { try {
AckSpanHolder data = ackSpanRingBuffer.get(sequence); AckSpanHolder data = ackSpanRingBuffer.get(sequence);
data.setAckSpan(ackSpan); data.setAckSpan(ackSpan);
...@@ -44,9 +50,10 @@ public class AckSpanDisruptor { ...@@ -44,9 +50,10 @@ public class AckSpanDisruptor {
} }
} }
public void shutdown() { public void shutdown() {
super.shutdown();
ackSpanEventHandler.stop(); ackSpanEventHandler.stop();
ackSpanDisruptor.shutdown(); ackSpanDisruptor.shutdown();
} }
} }
...@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog; ...@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.routing.config.Config; import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory; import com.lmax.disruptor.util.DaemonThreadFactory;
...@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory; ...@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
/** /**
* Created by xin on 2016/11/29. * Created by xin on 2016/11/29.
*/ */
public class RequestSpanDisruptor { public class RequestSpanDisruptor extends AbstractSpanDisruptor {
private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class); private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class);
private Disruptor<RequestSpanHolder> requestSpanDisruptor; private Disruptor<RequestSpanHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanHolder> requestSpanRingBuffer; private RingBuffer<RequestSpanHolder> requestSpanRingBuffer;
...@@ -28,8 +30,13 @@ public class RequestSpanDisruptor { ...@@ -28,8 +30,13 @@ public class RequestSpanDisruptor {
} }
public boolean saveRequestSpan(RequestSpan requestSpan) { public boolean saveRequestSpan(RequestSpan requestSpan) {
long sequence = requestSpanRingBuffer.next(); long sequence = this.getRingBufferSequence(requestSpanRingBuffer);
if(this.isShutdown()){
return false;
}
try { try {
RequestSpanHolder data = requestSpanRingBuffer.get(sequence); RequestSpanHolder data = requestSpanRingBuffer.get(sequence);
data.setRequestSpan(requestSpan); data.setRequestSpan(requestSpan);
...@@ -44,7 +51,8 @@ public class RequestSpanDisruptor { ...@@ -44,7 +51,8 @@ public class RequestSpanDisruptor {
} }
} }
public void shutDown() { public void shutdown() {
super.shutdown();
eventHandler.stop(); eventHandler.stop();
requestSpanDisruptor.shutdown(); requestSpanDisruptor.shutdown();
} }
......
...@@ -19,7 +19,6 @@ public class Router implements NodeChangesListener { ...@@ -19,7 +19,6 @@ public class Router implements NodeChangesListener {
private static ILog logger = LogManager.getLogger(Router.class); private static ILog logger = LogManager.getLogger(Router.class);
private SpanDisruptor[] disruptors = new SpanDisruptor[0]; private SpanDisruptor[] disruptors = new SpanDisruptor[0];
private NoopSpanDisruptor noopSpanPool = new NoopSpanDisruptor();
public SpanDisruptor lookup(RequestSpan requestSpan) { public SpanDisruptor lookup(RequestSpan requestSpan) {
return getSpanDisruptor(requestSpan.getRouteKey()); return getSpanDisruptor(requestSpan.getRouteKey());
...@@ -31,7 +30,7 @@ public class Router implements NodeChangesListener { ...@@ -31,7 +30,7 @@ public class Router implements NodeChangesListener {
private SpanDisruptor getSpanDisruptor(long routKey) { private SpanDisruptor getSpanDisruptor(long routKey) {
if (disruptors.length == 0) { if (disruptors.length == 0) {
return noopSpanPool; return NoopSpanDisruptor.INSTANCE;
} }
while (true) { while (true) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册