未验证 提交 275990df 编写于 作者: A Andrey Pechkurov 提交者: GitHub

chore(test): remove testOneToParallelSubscriber test (#1558)

上级 bbe6c729
......@@ -28,7 +28,6 @@ import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.LongList;
import io.questdb.std.Numbers;
import io.questdb.std.Os;
import io.questdb.std.Rnd;
import org.junit.Assert;
import org.junit.Test;
......@@ -144,7 +143,8 @@ public class ConcurrentTest {
pingPong.add(pingCursor);
System.out.println("* ping " + requestId);
// Uncomment this and the following lines when in need for debugging
// System.out.println("* ping " + requestId);
long pongCursor;
while ((pongCursor = pongPubSeq.next()) < 0) {
......@@ -154,7 +154,7 @@ public class ConcurrentTest {
pongPubSeq.done(pongCursor);
pingPong.add(pongCursor);
System.out.println("* pong " + requestId);
// System.out.println("* pong " + requestId);
i++;
} else {
LockSupport.parkNanos(10);
......@@ -185,7 +185,7 @@ public class ConcurrentTest {
for (int i = 0; i < iterations; i++) {
// Put local response sequence into response FanOut
System.out.println("thread:" + threadId + ", added at " + pingPubSeq.value);
// System.out.println("thread:" + threadId + ", added at " + pingPubSeq.value);
// Send next request
long requestId = idGen.incrementAndGet();
......@@ -197,7 +197,7 @@ public class ConcurrentTest {
pingPubSeq.done(pingCursor);
pingPong.add(pingCursor);
System.out.println("thread:" + threadId + ", ask: " + requestId);
// System.out.println("thread:" + threadId + ", ask: " + requestId);
// Wait for response
long responseId, pongCursor;
......@@ -209,10 +209,10 @@ public class ConcurrentTest {
responseId = pongQueue.get(pongCursor).correlationId;
pongSubSeq.done(pongCursor);
pingPong.add(pongCursor);
System.out.println("thread:" + threadId + ", ping: " + responseId + ", expected: " + requestId);
// System.out.println("thread:" + threadId + ", ping: " + responseId + ", expected: " + requestId);
} while (responseId != requestId);
System.out.println("thread " + threadId + ", pong " + requestId);
// System.out.println("thread " + threadId + ", pong " + requestId);
// Remove local response sequence from response FanOut
}
pongSubFo.remove(pongSubSeq);
......@@ -267,7 +267,8 @@ public class ConcurrentTest {
long requestId = msg.correlationId;
pingSubSeq.done(seq);
LOG.info().$("ping received ").$(requestId).$();
// Uncomment this and the following lines when in need for debugging
// LOG.info().$("ping received ").$(requestId).$();
long resp;
while ((resp = pongPubSeq.next()) < 0) {
......@@ -276,7 +277,7 @@ public class ConcurrentTest {
pongQueue.get(resp).correlationId = requestId;
pongPubSeq.done(resp);
LOG.info().$("pong sent ").$(requestId).$();
// LOG.info().$("pong sent ").$(requestId).$();
i++;
} else {
LockSupport.parkNanos(10);
......@@ -307,7 +308,7 @@ public class ConcurrentTest {
}
pingQueue.get(reqSeq).correlationId = requestId;
pingPubSeq.done(reqSeq);
LOG.info().$(threadId).$(", ping sent ").$(requestId).$();
// LOG.info().$(threadId).$(", ping sent ").$(requestId).$();
// Wait for response
long responseId, respCursor;
......@@ -319,7 +320,7 @@ public class ConcurrentTest {
pongSubSeq.done(respCursor);
} while (responseId != requestId);
LOG.info().$(threadId).$(", pong received ").$(requestId).$();
// LOG.info().$(threadId).$(", pong received ").$(requestId).$();
// Remove local response sequence from response FanOut
pongSubFo.remove(pongSubSeq);
......@@ -701,62 +702,6 @@ public class ConcurrentTest {
}
}
@Test
public void testOneToParallelSubscriber() throws Exception {
LOG.info().$("testOneToParallelSubscriber").$();
int cycle = 1024;
int size = 1024 * cycle;
RingQueue<Event> queue = new RingQueue<>(Event.FACTORY, cycle);
SPSequence pubSeq = new SPSequence(cycle);
Sequence sub1 = new SCSequence();
Sequence sub2 = new SCSequence();
FanOut fanOut = FanOut.to(sub1).and(sub2);
pubSeq.then(fanOut).then(pubSeq);
CyclicBarrier barrier = new CyclicBarrier(4);
CountDownLatch latch = new CountDownLatch(3);
BusyConsumer[] consumers = new BusyConsumer[2];
consumers[0] = new BusyConsumer(size, sub1, queue, barrier, latch);
consumers[1] = new BusyConsumer(size, sub2, queue, barrier, latch);
BusySubscriber subscriber = new BusySubscriber(queue, barrier, latch, fanOut);
subscriber.start();
consumers[0].start();
consumers[1].start();
barrier.await();
int i = 0;
while (true) {
long cursor = pubSeq.next();
if (cursor < 0) {
continue;
}
queue.get(cursor).value = i++;
pubSeq.done(cursor);
if (i == size) {
break;
}
}
publishEOE(queue, pubSeq);
publishEOE(queue, pubSeq);
latch.await();
for (int k = 0; k < 2; k++) {
for (i = 0; i < consumers[k].buf.length; i++) {
Assert.assertEquals(i, consumers[k].buf[i]);
}
}
for (i = 0; i < subscriber.buf.length; i++) {
Assert.assertTrue(subscriber.buf[i] > 0);
}
}
static void publishEOE(RingQueue<Event> queue, Sequence sequence) {
long cursor = sequence.nextBully();
queue.get(cursor).value = Integer.MIN_VALUE;
......@@ -772,15 +717,15 @@ public class ConcurrentTest {
private final int[] buf;
private final RingQueue<Event> queue;
private final CyclicBarrier barrier;
private final CountDownLatch latch;
private final CountDownLatch doneLatch;
private volatile int finalIndex = 0;
BusyConsumer(int cycle, Sequence sequence, RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch latch) {
BusyConsumer(int cycle, Sequence sequence, RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch doneLatch) {
this.sequence = sequence;
this.buf = new int[cycle];
this.queue = queue;
this.barrier = barrier;
this.latch = latch;
this.doneLatch = doneLatch;
}
@Override
......@@ -804,56 +749,7 @@ public class ConcurrentTest {
}
finalIndex = p;
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class BusySubscriber extends Thread {
private final int[] buf;
private final RingQueue<Event> queue;
private final CyclicBarrier barrier;
private final CountDownLatch latch;
private final FanOut fanOut;
BusySubscriber(RingQueue<Event> queue, CyclicBarrier barrier, CountDownLatch latch, FanOut fanOut) {
this.buf = new int[20];
this.queue = queue;
this.barrier = barrier;
this.latch = latch;
this.fanOut = fanOut;
}
@Override
public void run() {
try {
barrier.await();
Os.sleep(10);
// subscribe
Sequence sequence = new SCSequence(0);
fanOut.and(sequence);
int p = 0;
while (p < buf.length) {
long cursor = sequence.next();
if (cursor < 0) {
LockSupport.parkNanos(1);
continue;
}
int v = queue.get(cursor).value;
sequence.done(cursor);
if (v == Integer.MIN_VALUE) {
break;
}
buf[p++] = v;
}
fanOut.remove(sequence);
latch.countDown();
doneLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册