diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanDisruptor.java new file mode 100644 index 0000000000000000000000000000000000000000..82cbda6d475978a692317e3601f0f627cfb7fc25 --- /dev/null +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/AbstractSpanDisruptor.java @@ -0,0 +1,39 @@ +package com.a.eye.skywalking.routing.disruptor; + +import com.lmax.disruptor.InsufficientCapacityException; +import com.lmax.disruptor.RingBuffer; + +/** + * Created by wusheng on 2016/12/4. + */ +public class AbstractSpanDisruptor { + private volatile boolean isRunning = true; + + public long getRingBufferSequence(RingBuffer buffer){ + long sequence; + while(true) { + try { + if(!isRunning){ + return -1; + } + + sequence = buffer.tryNext(); + break; + } catch (InsufficientCapacityException e) { + try { + Thread.sleep(1L); + } catch (InterruptedException e1) { + } + } + } + return sequence; + } + + protected boolean isShutdown(){ + return !isRunning; + } + + protected void shutdown(){ + isRunning = false; + } +} diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/NoopSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/NoopSpanDisruptor.java index f6b8ea1848b845aad75930a426366c176535786e..a8a92acdb536576e3f5afd7f1ce06ccec5002bd5 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/NoopSpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/NoopSpanDisruptor.java @@ -7,6 +7,11 @@ import com.a.eye.skywalking.network.grpc.RequestSpan; * Created by xin on 2016/11/29. */ public class NoopSpanDisruptor extends SpanDisruptor { + public static NoopSpanDisruptor INSTANCE = new NoopSpanDisruptor(); + + private NoopSpanDisruptor() { + + } @Override public boolean saveSpan(AckSpan ackSpan) { diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/SpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/SpanDisruptor.java index 4db6d0dab322a8b4f080f52e88c0ad775f0e937c..da8ee87b6c0dc47a0335e555146fbe7269c69381 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/SpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/SpanDisruptor.java @@ -48,7 +48,7 @@ public class SpanDisruptor { } public void shutdown() { - ackSpanDisruptor.shutdown(); + requestSpanDisruptor.shutdown(); ackSpanDisruptor.shutdown(); } } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java index cfcc72f6ab7ad4b6b248216902e8b4f11477e48d..efa05f08dfeaedd870672e185beee266adf9247a 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/AckSpanDisruptor.java @@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.network.grpc.AckSpan; import com.a.eye.skywalking.routing.config.Config; +import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor; +import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; @@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory; /** * Created by xin on 2016/11/29. */ -public class AckSpanDisruptor { +public class AckSpanDisruptor extends AbstractSpanDisruptor { private static ILog logger = LogManager.getLogger(AckSpanDisruptor.class); private Disruptor ackSpanDisruptor; private RingBuffer ackSpanRingBuffer; @@ -29,7 +31,11 @@ public class AckSpanDisruptor { } public boolean saveAckSpan(AckSpan ackSpan) { - long sequence = ackSpanRingBuffer.next(); + long sequence = this.getRingBufferSequence(ackSpanRingBuffer); + if(this.isShutdown()){ + return false; + } + try { AckSpanHolder data = ackSpanRingBuffer.get(sequence); data.setAckSpan(ackSpan); @@ -44,9 +50,10 @@ public class AckSpanDisruptor { } } + public void shutdown() { + super.shutdown(); ackSpanEventHandler.stop(); - ackSpanDisruptor.shutdown(); } } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java index c33f6ae6bd47b2ab768c598d99aba7c6c16c952b..c57ae497b218ef5c1a74e889a79d1a1b67cde98a 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RequestSpanDisruptor.java @@ -6,6 +6,8 @@ import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.routing.config.Config; +import com.a.eye.skywalking.routing.disruptor.AbstractSpanDisruptor; +import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; @@ -13,7 +15,7 @@ import com.lmax.disruptor.util.DaemonThreadFactory; /** * Created by xin on 2016/11/29. */ -public class RequestSpanDisruptor { +public class RequestSpanDisruptor extends AbstractSpanDisruptor { private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class); private Disruptor requestSpanDisruptor; private RingBuffer requestSpanRingBuffer; @@ -28,8 +30,13 @@ public class RequestSpanDisruptor { } public boolean saveRequestSpan(RequestSpan requestSpan) { - long sequence = requestSpanRingBuffer.next(); + long sequence = this.getRingBufferSequence(requestSpanRingBuffer); + if(this.isShutdown()){ + return false; + } + try { + RequestSpanHolder data = requestSpanRingBuffer.get(sequence); data.setRequestSpan(requestSpan); @@ -44,7 +51,8 @@ public class RequestSpanDisruptor { } } - public void shutDown() { + public void shutdown() { + super.shutdown(); eventHandler.stop(); requestSpanDisruptor.shutdown(); } diff --git a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java index ffc29b7ad7a15fd4b6d7767fb1a99d081697aeae..71023b6c8cf435a0015edbbb54eff32c30c3d694 100644 --- a/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java +++ b/skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/router/Router.java @@ -19,7 +19,6 @@ public class Router implements NodeChangesListener { private static ILog logger = LogManager.getLogger(Router.class); private SpanDisruptor[] disruptors = new SpanDisruptor[0]; - private NoopSpanDisruptor noopSpanPool = new NoopSpanDisruptor(); public SpanDisruptor lookup(RequestSpan requestSpan) { return getSpanDisruptor(requestSpan.getRouteKey()); @@ -31,7 +30,7 @@ public class Router implements NodeChangesListener { private SpanDisruptor getSpanDisruptor(long routKey) { if (disruptors.length == 0) { - return noopSpanPool; + return NoopSpanDisruptor.INSTANCE; } while (true) {