未验证 提交 888cef74 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(core): fix SO in concurrent FanOut.and (#1460)

上级 826f3e67
...@@ -61,6 +61,8 @@ public class FanOut implements Barrier { ...@@ -61,6 +61,8 @@ public class FanOut implements Barrier {
public FanOut and(Barrier barrier) { public FanOut and(Barrier barrier) {
Holder _new; Holder _new;
boolean barrierNotSetUp = true;
do { do {
Holder h = this.holder; Holder h = this.holder;
// read barrier to make sure "holder" read doesn't fall below this // read barrier to make sure "holder" read doesn't fall below this
...@@ -68,8 +70,9 @@ public class FanOut implements Barrier { ...@@ -68,8 +70,9 @@ public class FanOut implements Barrier {
if (h.barriers.indexOf(barrier) > -1) { if (h.barriers.indexOf(barrier) > -1) {
return this; return this;
} }
if (this.barrier != null) { if (barrierNotSetUp && this.barrier != null) {
barrier.root().setBarrier(this.barrier); barrier.root().setBarrier(this.barrier);
barrierNotSetUp = false;
} }
_new = new Holder(); _new = new Holder();
_new.barriers.addAll(h.barriers); _new.barriers.addAll(h.barriers);
......
...@@ -35,6 +35,7 @@ import java.util.Arrays; ...@@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
public class ConcurrentTest { public class ConcurrentTest {
...@@ -389,6 +390,41 @@ public class ConcurrentTest { ...@@ -389,6 +390,41 @@ public class ConcurrentTest {
} }
} }
@Test
public void testConcurrentFanOutAnd() {
int cycle = 1024;
SPSequence pubSeq = new SPSequence(cycle);
FanOut fout = new FanOut();
pubSeq.then(fout).then(pubSeq);
int threads = 2;
CyclicBarrier start = new CyclicBarrier(threads);
SOCountDownLatch latch = new SOCountDownLatch(threads);
int iterations = 30;
AtomicInteger doneCount = new AtomicInteger();
for(int i = 0; i < threads; i++) {
new Thread(() -> {
try {
start.await();
for(int j = 0; j < iterations; j++) {
SCSequence consumer = new SCSequence();
FanOut fout2 = fout.and(consumer);
fout2.remove(consumer);
}
doneCount.addAndGet(iterations);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
Assert.assertEquals(threads * iterations, doneCount.get());
}
static void publishEOE(RingQueue<Event> queue, Sequence sequence) { static void publishEOE(RingQueue<Event> queue, Sequence sequence) {
long cursor = sequence.nextBully(); long cursor = sequence.nextBully();
queue.get(cursor).value = Integer.MIN_VALUE; queue.get(cursor).value = Integer.MIN_VALUE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册