diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java new file mode 100644 index 0000000000000000000000000000000000000000..15070484a0fb8493e5748190e76ffa96a613372e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ConsumerDedupPermitsUpdate extends ProducerConsumerBase { + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "combinations") + public Object[][] combinations() { + return new Object[][] { + // batching-enabled - queue-size + { false, 0 }, + { false, 1 }, + { false, 10 }, + { false, 100 }, + { true, 1 }, + { true, 10 }, + { true, 100 }, + }; + } + + @Test(timeOut = 30000, dataProvider = "combinations") + public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) throws Exception { + String topic = "persistent://my-property/my-ns/my-topic-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test") + // Use high ack delay to simulate a message being tracked as dup + .acknowledgmentGroupTime(1, TimeUnit.HOURS) + .receiverQueueSize(receiverQueueSize) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(batchingEnabled) + .batchingMaxMessages(10) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create(); + + for (int i = 0; i < 30; i++) { + producer.sendAsync("hello-" + i); + } + producer.flush(); + + // Consumer receives and acks all the messages, though the acks + // are still cached in client lib + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "hello-" + i); + consumer.acknowledge(msg); + } + + // Trigger redelivery by unloading the topic. + admin.topics().unload(topic); + + // Consumer dedup logic will detect the dups and not bubble them up to the application + // (With zero-queue we cannot use receive with timeout) + if (receiverQueueSize > 0) { + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNull(msg); + } + + // The flow permits in consumer shouldn't have been messed up by the deduping + // and we should be able to get new messages through + for (int i = 0; i < 30; i++) { + producer.sendAsync("new-message-" + i); + } + producer.flush(); + + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "new-message-" + i); + consumer.acknowledge(msg); + } + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index d8a0de3b9a4be48d9907bdad86a1b0a40fb2a763..784644b21add6306d2d10b585505721f0800ed8e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -73,11 +73,8 @@ public abstract class ConsumerBase extends HandlerState implements Consumer(); - } + // Always use growable queue since items can exceed the advertised size + this.incomingMessages = new GrowableArrayBlockingQueue<>(); this.listenerExecutor = listenerExecutor; this.pendingReceives = Queues.newConcurrentLinkedQueue(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5d5b1fa79467a2b420ef6313625ee8041fd1b378..e5c9d53814dd2edb202f38fc23543f597654d82f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -778,18 +778,6 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle messageId.getEntryId()); } - MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", - topic, subscription, msgId); - } - if (conf.getReceiverQueueSize() == 0) { - increaseAvailablePermits(cnx); - } - return; - } - MessageMetadata msgMetadata = null; ByteBuf payload = headersAndPayload; @@ -806,6 +794,19 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle return; } + final int numMessages = msgMetadata.getNumMessagesInBatch(); + + MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); + if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", + topic, subscription, msgId); + } + + increaseAvailablePermits(cnx, numMessages); + return; + } + ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx); boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata); @@ -824,8 +825,6 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle return; } - final int numMessages = msgMetadata.getNumMessagesInBatch(); - // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {