提交 528226ef 编写于 作者: A ascrutae

Merge remote-tracking branch 'origin/master'

package com.a.eye.skywalking.routing.disruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
/**
* Created by wusheng on 2016/12/4.
*/
public class AbstractSpanDisruptor {
private volatile boolean isRunning = true;
public long getRingBufferSequence(RingBuffer buffer){
long sequence;
while(true) {
try {
if(!isRunning){
return -1;
}
sequence = buffer.tryNext();
break;
} catch (InsufficientCapacityException e) {
try {
Thread.sleep(1L);
} catch (InterruptedException e1) {
}
}
}
return sequence;
}
protected boolean isShutdown(){
return !isRunning;
}
protected void shutdown(){
isRunning = false;
}
}
......@@ -7,6 +7,11 @@ import com.a.eye.skywalking.network.grpc.RequestSpan;
* Created by xin on 2016/11/29.
*/
public class NoopSpanDisruptor extends SpanDisruptor {
public static NoopSpanDisruptor INSTANCE = new NoopSpanDisruptor();
private NoopSpanDisruptor() {
}
@Override
public boolean saveSpan(AckSpan ackSpan) {
......
......@@ -48,7 +48,7 @@ public class SpanDisruptor {
}
public void shutdown() {
ackSpanDisruptor.shutdown();
requestSpanDisruptor.shutdown();
ackSpanDisruptor.shutdown();
}
}
......@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
......@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by xin on 2016/11/29.
*/
public class AckSpanDisruptor {
public class AckSpanDisruptor extends AbstractSpanDisruptor {
private static ILog logger = LogManager.getLogger(AckSpanDisruptor.class);
private Disruptor<AckSpanHolder> ackSpanDisruptor;
private RingBuffer<AckSpanHolder> ackSpanRingBuffer;
......@@ -29,7 +31,11 @@ public class AckSpanDisruptor {
}
public boolean saveAckSpan(AckSpan ackSpan) {
long sequence = ackSpanRingBuffer.next();
long sequence = this.getRingBufferSequence(ackSpanRingBuffer);
if(this.isShutdown()){
return false;
}
try {
AckSpanHolder data = ackSpanRingBuffer.get(sequence);
data.setAckSpan(ackSpan);
......@@ -44,9 +50,10 @@ public class AckSpanDisruptor {
}
}
public void shutdown() {
super.shutdown();
ackSpanEventHandler.stop();
ackSpanDisruptor.shutdown();
}
}
......@@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
......@@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by xin on 2016/11/29.
*/
public class RequestSpanDisruptor {
public class RequestSpanDisruptor extends AbstractSpanDisruptor {
private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class);
private Disruptor<RequestSpanHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanHolder> requestSpanRingBuffer;
......@@ -28,8 +30,13 @@ public class RequestSpanDisruptor {
}
public boolean saveRequestSpan(RequestSpan requestSpan) {
long sequence = requestSpanRingBuffer.next();
long sequence = this.getRingBufferSequence(requestSpanRingBuffer);
if(this.isShutdown()){
return false;
}
try {
RequestSpanHolder data = requestSpanRingBuffer.get(sequence);
data.setRequestSpan(requestSpan);
......@@ -44,7 +51,8 @@ public class RequestSpanDisruptor {
}
}
public void shutDown() {
public void shutdown() {
super.shutdown();
eventHandler.stop();
requestSpanDisruptor.shutdown();
}
......
......@@ -19,7 +19,6 @@ public class Router implements NodeChangesListener {
private static ILog logger = LogManager.getLogger(Router.class);
private SpanDisruptor[] disruptors = new SpanDisruptor[0];
private NoopSpanDisruptor noopSpanPool = new NoopSpanDisruptor();
public SpanDisruptor lookup(RequestSpan requestSpan) {
return getSpanDisruptor(requestSpan.getRouteKey());
......@@ -31,7 +30,7 @@ public class Router implements NodeChangesListener {
private SpanDisruptor getSpanDisruptor(long routKey) {
if (disruptors.length == 0) {
return noopSpanPool;
return NoopSpanDisruptor.INSTANCE;
}
while (true) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册