diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 04264a24fd88733407a08428dd5a5eec79551a1f..c8bcf69e80a7cd9d257b469f645f97f450169b0e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1773,4 +1773,87 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } } + @Test + public void testRedeliveryFailOverConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int receiverQueueSize = 10; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(receiverQueueSize); + conf.setSubscriptionType(SubscriptionType.Failover); + // Only subscribe consumer + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", + producerConf); + + // (1) First round to produce-consume messages + int consumeMsgInParts = 4; + for (int i = 0; i < receiverQueueSize; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + // (1.a) consume first consumeMsgInParts msgs and trigger redeliver + Message msg = null; + List messages1 = Lists.newArrayList(); + for (int i = 0; i < consumeMsgInParts; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), consumeMsgInParts); + consumer.redeliverUnacknowledgedMessages(); + + // (1.b) consume second consumeMsgInParts msgs and trigger redeliver + messages1.clear(); + for (int i = 0; i < consumeMsgInParts; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), consumeMsgInParts); + consumer.redeliverUnacknowledgedMessages(); + + // (2) Second round to produce-consume messages + for (int i = 0; i < receiverQueueSize; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(100); + } + + int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts); + messages1.clear(); + for (int i = 0; i < remainingMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), remainingMsgs); + + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + + } + } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index ded3d7b5251d52e2a30bb940d730125abdaa11a9..2b18bb8dcc71d74faab7722973942c8caabb6102 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -902,13 +902,12 @@ public class ConsumerImpl extends ConsumerBase { synchronized (this) { currentSize = incomingMessages.size(); incomingMessages.clear(); - availablePermits.set(0); unAckedMessageTracker.clear(); batchMessageAckTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); if (currentSize > 0) { - sendFlowPermitsToBroker(cnx, currentSize); + increaseAvailablePermits(cnx, currentSize); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,