From 8df2868bcb4fa5371fb7eba89eb9fc9d3dbbf80a Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 18 Oct 2016 14:53:48 -0700 Subject: [PATCH] fix: flow permits on blocked consumer while redelivery of messages (#74) --- .../yahoo/pulsar/broker/service/Consumer.java | 31 ++- .../api/SimpleProducerConsumerTest.java | 182 +++++++++++++++++- 2 files changed, 207 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index a5d56e37869..7ae0215486d 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -23,7 +23,6 @@ import java.time.Instant; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -34,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Objects; +import com.google.common.collect.Lists; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck; @@ -458,11 +458,32 @@ public class Consumer { } public void redeliverUnacknowledgedMessages(List messageIds) { - List pendingPositions = messageIds.stream() - .map(messageIdData -> PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId())) - .filter(position -> pendingAcks.remove(position) != null) - .collect(Collectors.toList()); + + int totalRedeliveryMessages = 0; + List pendingPositions = Lists.newArrayList(); + for (MessageIdData msg : messageIds) { + PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); + Integer batchSize = pendingAcks.remove(position); + if (batchSize != null) { + totalRedeliveryMessages += batchSize; + pendingPositions.add(position); + } + } + + unackedMessages.addAndGet(-totalRedeliveryMessages); + blockedConsumerOnUnackedMsgs = false; subscription.redeliverUnacknowledgedMessages(this, pendingPositions); + + int numberOfBlockedPermits = Math.min(totalRedeliveryMessages, + permitsReceivedWhileConsumerBlocked.get()); + + // if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages + if (numberOfBlockedPermits > 0) { + permitsReceivedWhileConsumerBlocked.getAndAdd(-numberOfBlockedPermits); + messagePermits.getAndAdd(numberOfBlockedPermits); + subscription.consumerFlow(this, numberOfBlockedPermits); + + } } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index f8b813bcaf3..81fb6b97949 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -55,6 +56,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.client.impl.ConsumerImpl; +import com.yahoo.pulsar.client.impl.MessageIdImpl; import com.yahoo.pulsar.client.util.FutureUtil; import com.yahoo.pulsar.common.api.PulsarDecoder; @@ -1044,7 +1046,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages); } } - + /** * Verify: iteration of * a. message receive w/o acking @@ -1535,4 +1537,182 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); log.info("-- Exiting {} test --", methodName); } + + /** + * It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets + * blocked due to unacked messsages + * + * Usecase: produce message with 10ms interval: so, consumer can consume only 10 messages without acking + * + * @throws Exception + */ + @Test + public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() throws Exception { + log.info("-- Starting {} test --", methodName); + + int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(); + try { + final int unAckedMessagesBufferSize = 10; + final int receiverQueueSize = 20; + final int totalProducedMsgs = 20; + + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize); + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(receiverQueueSize); + conf.setSubscriptionType(SubscriptionType.Shared); + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", + producerConf); + + // (1) Produced Messages + for (int i = 0; i < totalProducedMsgs; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + + // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize + Message msg = null; + List messages1 = Lists.newArrayList(); + for (int i = 0; i < totalProducedMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + + // client should not receive all produced messages and should be blocked due to unack-messages + assertEquals(messages1.size(), unAckedMessagesBufferSize); + Set redeliveryMessages = messages1.stream().map(m -> { + return (MessageIdImpl) m.getMessageId(); + }).collect(Collectors.toSet()); + + // (3) redeliver all consumed messages + consumer.redeliverUnacknowledgedMessages(Lists.newArrayList(redeliveryMessages)); + Thread.sleep(1000); + + Set messages2 = Sets.newHashSet(); + for (int i = 0; i < totalProducedMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages2.add((MessageIdImpl) msg.getMessageId()); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + + assertEquals(messages1.size(), messages2.size()); + // (4) Verify: redelivered all previous unacked-consumed messages + messages2.removeAll(redeliveryMessages); + assertEquals(messages2.size(), 0); + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } catch (Exception e) { + fail(); + } finally { + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages); + } + } + + /** + * It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets + * blocked due to unacked messsages + * + * Usecase: Consumer starts consuming only after all messages have been produced. + * So, consumer consumes total receiver-queue-size number messages => ask for redelivery and receives all messages again. + * + * @throws Exception + */ + @Test(invocationCount=10) + public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception { + log.info("-- Starting {} test --", methodName); + + int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(); + try { + final int unAckedMessagesBufferSize = 10; + final int receiverQueueSize = 20; + final int totalProducedMsgs = 50; + + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize); + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(receiverQueueSize); + conf.setSubscriptionType(SubscriptionType.Shared); + // Only subscribe consumer + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); + consumer.close(); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", + producerConf); + + // (1) Produced Messages + for (int i = 0; i < totalProducedMsgs; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + + // (1.a) start consumer again + consumer = (ConsumerImpl) pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic", + "subscriber-1", conf); + + // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize + Message msg = null; + List messages1 = Lists.newArrayList(); + for (int i = 0; i < totalProducedMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + + // client should not receive all produced messages and should be blocked due to unack-messages + assertEquals(messages1.size(), receiverQueueSize); + Set redeliveryMessages = messages1.stream().map(m -> { + return (MessageIdImpl) m.getMessageId(); + }).collect(Collectors.toSet()); + + // (3) redeliver all consumed messages + consumer.redeliverUnacknowledgedMessages(Lists.newArrayList(redeliveryMessages)); + Thread.sleep(1000); + + Set messages2 = Sets.newHashSet(); + for (int i = 0; i < totalProducedMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages2.add((MessageIdImpl) msg.getMessageId()); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + + assertEquals(messages1.size(), messages2.size()); + // (4) Verify: redelivered all previous unacked-consumed messages + messages2.removeAll(redeliveryMessages); + assertEquals(messages2.size(), 0); + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } catch (Exception e) { + fail(); + } finally { + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages); + } + } + } -- GitLab