提交 340dc8d1 编写于 作者: M Matteo Merli 提交者: GitHub

Fix #91 - Intermittent test failure on...

Fix #91 - Intermittent test failure on SimpleProducerConsumerTest.testSharedConsumerAckDifferentConsumer (#92)
上级 551d42d8
......@@ -20,8 +20,8 @@ import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
......@@ -568,12 +568,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
/**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
*
*
* Usecase 2: 2 Active Subscriptions (faster and slower) and slower gets closed - 2 subscribers - Produce Messages -
* 1 faster-subscriber consumes all messages and another slower-subscriber none - EntryCache should have cached
* messages as slower-subscriber has not consumed messages yet - close slower-subscriber - EntryCache should be
* cleared
*
*
* @throws Exception
*/
@Test
......@@ -864,19 +864,19 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
assertEquals(message.getBytes().length, msgLength.get());
}
}
/**
* consume message from consumer1 and send acknowledgement from different consumer subscribed under same
* subscription-name
*
*
* @throws Exception
*/
@Test
@Test(timeOut = 30000)
public void testSharedConsumerAckDifferentConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(5);
conf.setReceiverQueueSize(1);
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
......@@ -895,15 +895,13 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Set<Message> consumerMsgSet1 = Sets.newHashSet();
Set<Message> consumerMsgSet2 = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
msg = consumer1.receive();
consumerMsgSet1.add(msg);
}
for (int i = 0; i < 5; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
msg = consumer2.receive();
consumerMsgSet2.add(msg);
}
consumerMsgSet1.stream().forEach(m -> {
try {
consumer2.acknowledge(m);
......@@ -923,7 +921,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer2.redeliverUnacknowledgedMessages();
try {
if (consumer1.receive(1, TimeUnit.SECONDS) != null || consumer2.receive(1, TimeUnit.SECONDS) != null) {
if (consumer1.receive(100, TimeUnit.MILLISECONDS) != null
|| consumer2.receive(100, TimeUnit.MILLISECONDS) != null) {
fail();
}
} finally {
......@@ -961,16 +960,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
});
}
}
/**
* Verify: Consumer stops receiving msg when reach unack-msg limit and
* Verify: Consumer stops receiving msg when reach unack-msg limit and
* starts receiving once acks messages
* 1. Produce X (600) messages
* 1. Produce X (600) messages
* 2. Consumer has receive size (10) and receive message without acknowledging
* 3. Consumer will stop receiving message after unAckThreshold = 500
* 4. Consumer acks messages and starts consuming remanining messages
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* @throws Exception
*/
@Test
......@@ -1048,19 +1047,19 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
/**
* Verify: iteration of
* a. message receive w/o acking
* Verify: iteration of
* a. message receive w/o acking
* b. stop receiving msg
* c. ack msgs
* d. started receiving msgs
*
* d. started receiving msgs
*
* 1. Produce total X (1500) messages
* 2. Consumer consumes messages without acking until stop receiving
* 2. Consumer consumes messages without acking until stop receiving
* from broker due to reaching ack-threshold (500)
* 3. Consumer acks messages after stop getting messages
* 3. Consumer acks messages after stop getting messages
* 4. Consumer again tries to consume messages
* 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
*
*
* @throws Exception
*/
@Test
......@@ -1133,12 +1132,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
......@@ -1163,7 +1162,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
"subscriber-1", conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
producerConf);
......@@ -1173,7 +1172,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer.send(message.getBytes());
}
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
......@@ -1239,7 +1238,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testUnackBlockRedeliverMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -1310,7 +1309,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test(dataProvider = "batch")
public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -1330,7 +1329,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
"subscriber-1", conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
......@@ -1348,7 +1347,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
FutureUtil.waitForAll(futures).get();
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
......@@ -1398,11 +1397,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
......@@ -1427,7 +1426,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
"subscriber-1", conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
producerConf);
......@@ -1437,7 +1436,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer.send(message.getBytes());
}
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
......@@ -1452,9 +1451,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
break;
}
}
assertEquals(messages.size(), maxUnackedMessages); //consumer1
// (3) ack for all UnackedMessages from consumer2
messages.forEach(m -> {
try {
......@@ -1485,7 +1484,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
break;
}
}
// verify total-consumer messages = total-produce messages
assertEquals(totalProducedMsgs, totalReceiveMessages);
producer.close();
......@@ -1537,13 +1536,13 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
*
* Usecase: produce message with 10ms interval: so, consumer can consume only 10 messages without acking
*
*
* @throws Exception
*/
@Test
......@@ -1622,14 +1621,14 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: Consumer starts consuming only after all messages have been produced.
*
* Usecase: Consumer starts consuming only after all messages have been produced.
* So, consumer consumes total receiver-queue-size number messages => ask for redelivery and receives all messages again.
*
*
* @throws Exception
*/
@Test(invocationCount=10)
......@@ -1714,5 +1713,5 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册