From d3cb108590713735ef1b23ff61997bf68e0fd7f3 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Wed, 13 Nov 2019 10:42:16 +0800 Subject: [PATCH] [Issue 5476]Fix message deduplicate issue while using external sequence id with batch produce (#5491) Fixes #5476 ### Motivation Fix #5476 ### Modifications 1. Add `last_sequence_id` in MessageMetadata and CommandSend, use sequence id and last_sequence_id to indicate the batch `lowest_sequence_id` and `highest_sequence_id`. 2. Handle batch message deduplicate check in MessageDeduplication 3. Response the `last_sequence_id` to client and add message deduplicate check in client --- .../pulsar/broker/service/Producer.java | 82 +++++++++++-- .../pulsar/broker/service/ServerCnx.java | 7 +- .../apache/pulsar/broker/service/Topic.java | 16 ++- .../persistent/MessageDeduplication.java | 12 +- .../persistent/PersistentReplicator.java | 3 +- .../persistent/MessageDuplicationTest.java | 43 +++++++ .../client/api/ClientDeduplicationTest.java | 106 ++++++++++++++++ .../impl/BatchMessageContainerImpl.java | 25 ++-- .../pulsar/client/impl/ProducerImpl.java | 86 +++++++++---- .../pulsar/common/api/proto/PulsarApi.java | 114 ++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 31 +++++ pulsar-common/src/main/proto/PulsarApi.proto | 6 + 12 files changed, 487 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 915c42d3c37..5ead8c944ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -132,6 +132,25 @@ public class Producer { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { + beforePublish(producerId, sequenceId, headersAndPayload, batchSize); + publishMessageToTopic(headersAndPayload, sequenceId, batchSize); + } + + public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, + ByteBuf headersAndPayload, long batchSize) { + if (lowestSequenceId > highestSequenceId) { + cnx.ctx().channel().eventLoop().execute(() -> { + cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError, + "Invalid lowest or highest sequence id")); + cnx.completedSendOperation(isNonPersistentTopic); + }); + return; + } + beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize); + publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize); + } + + public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { if (isClosed) { cnx.ctx().channel().eventLoop().execute(() -> { cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.PersistenceError, @@ -170,11 +189,20 @@ public class Producer { } startPublishOperation((int) batchSize, headersAndPayload.readableBytes()); + } + + private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, System.nanoTime())); } + private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) { + topic.publishMessage(headersAndPayload, + MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + System.nanoTime())); + } + private boolean verifyChecksum(ByteBuf headersAndPayload) { if (hasChecksum(headersAndPayload)) { int readerIndex = headersAndPayload.readerIndex(); @@ -257,6 +285,9 @@ public class Producer { private String originalProducerName; private long originalSequenceId; + private long highestSequenceId; + private long originalHighestSequenceId; + public String getProducerName() { return producer.getProducerName(); } @@ -265,6 +296,11 @@ public class Producer { return sequenceId; } + @Override + public long getHighestSequenceId() { + return highestSequenceId; + } + @Override public void setOriginalProducerName(String originalProducerName) { this.originalProducerName = originalProducerName; @@ -285,6 +321,16 @@ public class Producer { return originalSequenceId; } + @Override + public void setOriginalHighestSequenceId(long originalHighestSequenceId) { + this.originalHighestSequenceId = originalHighestSequenceId; + } + + @Override + public long getOriginalHighestSequenceId() { + return originalHighestSequenceId; + } + /** * Executed from managed ledger thread when the message is persisted */ @@ -298,7 +344,8 @@ public class Producer { if (!(exception instanceof TopicClosedException)) { // For TopicClosed exception there's no need to send explicit error, since the client was // already notified - producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, + long callBackSequenceId = Math.max(highestSequenceId, sequenceId); + producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, callBackSequenceId, serverError, exception.getMessage())); } producer.cnx.completedSendOperation(producer.isNonPersistentTopic); @@ -330,8 +377,9 @@ public class Producer { // stats rateIn.recordMultipleEvents(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); + long callBackSequenceId = Math.max(highestSequenceId, sequenceId); producer.cnx.ctx().writeAndFlush( - Commands.newSendReceipt(producer.producerId, sequenceId, ledgerId, entryId), + Commands.newSendReceipt(producer.producerId, callBackSequenceId, ledgerId, entryId), producer.cnx.ctx().voidPromise()); producer.cnx.completedSendOperation(producer.isNonPersistentTopic); producer.publishOperationCompleted(); @@ -347,7 +395,22 @@ public class Producer { callback.msgSize = msgSize; callback.batchSize = batchSize; callback.originalProducerName = null; - callback.originalSequenceId = -1; + callback.originalSequenceId = -1L; + callback.startTimeNs = startTimeNs; + return callback; + } + + static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, + int msgSize, long batchSize, long startTimeNs) { + MessagePublishContext callback = RECYCLER.get(); + callback.producer = producer; + callback.sequenceId = lowestSequenceId; + callback.highestSequenceId = highestSequenceId; + callback.rateIn = rateIn; + callback.msgSize = msgSize; + callback.batchSize = batchSize; + callback.originalProducerName = null; + callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; return callback; } @@ -366,13 +429,16 @@ public class Producer { public void recycle() { producer = null; - sequenceId = -1; + sequenceId = -1L; + highestSequenceId = -1L; + originalSequenceId = -1L; + originalHighestSequenceId = -1L; rateIn = null; msgSize = 0; - ledgerId = -1; - entryId = -1; - batchSize = 0; - startTimeNs = -1; + ledgerId = -1L; + entryId = -1L; + batchSize = 0L; + startTimeNs = -1L; recyclerHandle.recycle(this); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d0ba3b93cd9..6b38d377b97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1051,7 +1051,12 @@ public class ServerCnx extends PulsarHandler { startSendOperation(producer); // Persist the message - producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages()); + if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { + producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), + headersAndPayload, send.getNumMessages()); + } else { + producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages()); + } } private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 419a9838bcf..d59b359d0d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -53,7 +53,7 @@ public interface Topic { } default long getSequenceId() { - return -1; + return -1L; } default void setOriginalProducerName(String originalProducerName) { @@ -73,10 +73,22 @@ public interface Topic { } default long getOriginalSequenceId() { - return -1; + return -1L; } void completed(Exception e, long ledgerId, long entryId); + + default long getHighestSequenceId() { + return -1L; + } + + default void setOriginalHighestSequenceId(long originalHighestSequenceId) { + + } + + default long getOriginalHighestSequenceId() { + return -1L; + } } void publishMessage(ByteBuf headersAndPayload, PublishContext callback); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e898b4ab203..dc4604e8a40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -160,7 +160,7 @@ public class MessageDeduplication { MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); String producerName = md.getProducerName(); - long sequenceId = md.getSequenceId(); + long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId()); highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); @@ -283,6 +283,7 @@ public class MessageDeduplication { String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); + long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId); if (producerName.startsWith(replicatorPrefix)) { // Message is coming from replication, we need to use the original producer name and sequence id // for the purpose of deduplication and not rely on the "replicator" name. @@ -290,8 +291,10 @@ public class MessageDeduplication { MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); producerName = md.getProducerName(); sequenceId = md.getSequenceId(); + highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); publishContext.setOriginalProducerName(producerName); publishContext.setOriginalSequenceId(sequenceId); + publishContext.setOriginalHighestSequenceId(highestSequenceId); headersAndPayload.readerIndex(readerIndex); md.recycle(); } @@ -317,8 +320,7 @@ public class MessageDeduplication { return MessageDupStatus.Unknown; } } - - highestSequencedPushed.put(producerName, sequenceId); + highestSequencedPushed.put(producerName, highestSequenceId); } return MessageDupStatus.NotDup; } @@ -333,13 +335,15 @@ public class MessageDeduplication { String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); + long highestSequenceId = publishContext.getHighestSequenceId(); if (publishContext.getOriginalProducerName() != null) { // In case of replicated messages, this will be different from the current replicator producer name producerName = publishContext.getOriginalProducerName(); sequenceId = publishContext.getOriginalSequenceId(); + highestSequenceId = publishContext.getOriginalHighestSequenceId(); } - highestSequencedPersisted.put(producerName, sequenceId); + highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); if (++snapshotCounter >= snapshotInterval) { snapshotCounter = 0; takeSnapshot(position); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 9cc794235dc..622098e76ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyExceptio import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -377,7 +378,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat @Override public void sendComplete(Exception exception) { - if (exception != null) { + if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) { log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName, replicator.localCluster, replicator.remoteCluster, exception); // cursor shoud be rewinded since it was incremented when readMoreEntries diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index a29de119b0c..5cfdef839db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @Slf4j @@ -125,6 +126,24 @@ public class MessageDuplicationTest { lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertTrue(lastSequenceIdPushed != null); assertEquals(lastSequenceIdPushed.longValue(), 5); + + // update highest sequence persisted + messageDeduplication.highestSequencedPushed.put(producerName1, 0L); + messageDeduplication.highestSequencedPersisted.put(producerName1, 0L); + byteBuf1 = getMessage(producerName1, 0); + publishContext1 = getPublishContext(producerName1, 1, 5); + status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertNotNull(lastSequenceIdPushed); + assertEquals(lastSequenceIdPushed.longValue(), 5); + + publishContext1 = getPublishContext(producerName1, 4, 8); + status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown); + lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); + assertNotNull(lastSequenceIdPushed); + assertEquals(lastSequenceIdPushed.longValue(), 5); } @Test @@ -319,4 +338,28 @@ public class MessageDuplicationTest { } }); } + + public Topic.PublishContext getPublishContext(String producerName, long seqId, long lastSequenceId) { + return spy(new Topic.PublishContext() { + @Override + public String getProducerName() { + return producerName; + } + + @Override + public long getSequenceId() { + return seqId; + } + + @Override + public long getHighestSequenceId() { + return lastSequenceId; + } + + @Override + public void completed(Exception e, long ledgerId, long entryId) { + + } + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 03f12b42486..97e7869a8bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -20,7 +20,10 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.testng.annotations.AfterClass; @@ -156,4 +159,107 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { producer.close(); } + + @Test(timeOut = 30000) + public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception { + String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId"; + admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); + + // Set infinite timeout + ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topic) + .producerName("my-producer-name").enableBatching(true).batchingMaxMessages(10).sendTimeout(0, TimeUnit.SECONDS); + Producer producer = producerBuilder.create(); + + assertEquals(producer.getLastSequenceId(), -1L); + + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("my-subscription") + .subscribe(); + + producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-1".getBytes()).sequenceId(3).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(5).sendAsync(); + + producer.flush(); + + // Repeat the messages and verify they're not received by consumer + producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); + producer.close(); + + for (int i = 0; i < 3; i++) { + Message msg = consumer.receive(); + assertEquals(new String(msg.getData()), "my-message-" + i); + consumer.acknowledge(msg); + } + + // No other messages should be received + Message msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + // Kill and restart broker + restartBroker(); + + producer = producerBuilder.create(); + assertEquals(producer.getLastSequenceId(), 5L); + + // Repeat the messages and verify they're not received by consumer + producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); + producer.flush(); + + msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + producer.close(); + } + + @Test(timeOut = 30000) + public void testProducerDeduplicationNonBatchAsync() throws Exception { + String topic = "persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync"; + admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); + + // Set infinite timeout + ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topic) + .producerName("my-producer-name").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS); + Producer producer = producerBuilder.create(); + + assertEquals(producer.getLastSequenceId(), -1L); + + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("my-subscription") + .subscribe(); + + producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-1".getBytes()).sequenceId(3).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(5).sendAsync(); + + // Repeat the messages and verify they're not received by consumer + producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); + producer.close(); + + for (int i = 0; i < 3; i++) { + Message msg = consumer.receive(); + assertEquals(new String(msg.getData()), "my-message-" + i); + consumer.acknowledge(msg); + } + + // No other messages should be received + Message msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + // Kill and restart broker + restartBroker(); + + producer = producerBuilder.create(); + assertEquals(producer.getLastSequenceId(), 5L); + + // Repeat the messages and verify they're not received by consumer + producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); + + msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index b5ee767304b..24eaf9d5198 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -50,7 +50,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder(); // sequence id for this batch which will be persisted as a single entry by broker - private long sequenceId = -1; + private long lowestSequenceId = -1L; + private long highestSequenceId = -1L; private ByteBuf batchedMessageMetadataAndPayload; private List> messages = Lists.newArrayList(); protected SendCallback previousCallback = null; @@ -68,7 +69,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { if (++numMessagesInBatch == 1) { // some properties are common amongst the different messages in the batch, hence we just pick it up from // the first message - sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); + lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES)); @@ -80,6 +81,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { previousCallback = callback; currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); messages.add(msg); + if (lowestSequenceId == -1L) { + lowestSequenceId = msg.getSequenceId(); + messageMetadata.setSequenceId(lowestSequenceId); + } + highestSequenceId = msg.getSequenceId(); + producer.lastSequenceIdPushed = msg.getSequenceId(); } private ByteBuf getCompressedBatchMetadataAndPayload() { @@ -131,7 +138,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { messageMetadata.clear(); numMessagesInBatch = 0; currentBatchSizeBytes = 0; - sequenceId = -1; + lowestSequenceId = -1L; + highestSequenceId = -1L; batchedMessageMetadataAndPayload = null; } @@ -149,7 +157,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, - sequenceId, t); + lowestSequenceId, t); } ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload); clear(); @@ -164,10 +172,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); messageMetadata.setNumMessagesInBatch(numMessagesInBatch); - ByteBufPair cmd = producer.sendMessage(producer.producerId, sequenceId, numMessagesInBatch, - messageMetadata.build(), encryptedPayload); + messageMetadata.setHighestSequenceId(highestSequenceId); + ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), + messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata.build(), encryptedPayload); - OpSendMsg op = OpSendMsg.create(messages, cmd, sequenceId, firstCallback); + OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), + messageMetadata.getHighestSequenceId(), firstCallback); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { cmd.release(); @@ -181,6 +191,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { op.setNumMessagesInBatch(numMessagesInBatch); op.setBatchSizeByte(currentBatchSizeBytes); + lowestSequenceId = -1L; return op; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index d5e86de0559..2ccbfe19c50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -111,6 +111,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final CompressionCodec compressor; private volatile long lastSequenceIdPublished; + protected volatile long lastSequenceIdPushed; + private MessageCrypto msgCrypto = null; private ScheduledFuture keyGeneratorTask = null; @@ -141,10 +143,12 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne if (conf.getInitialSequenceId() != null) { long initialSequenceId = conf.getInitialSequenceId(); this.lastSequenceIdPublished = initialSequenceId; - this.msgIdGenerator = initialSequenceId + 1; + this.lastSequenceIdPushed = initialSequenceId; + this.msgIdGenerator = initialSequenceId + 1L; } else { - this.lastSequenceIdPublished = -1; - this.msgIdGenerator = 0; + this.lastSequenceIdPublished = -1L; + this.lastSequenceIdPushed = -1L; + this.msgIdGenerator = 0L; } if (conf.isEncryptionEnabled()) { @@ -383,15 +387,27 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne msgMetadataBuilder.setUncompressedSize(uncompressedSize); } if (canAddToBatch(msg)) { - // handle boundary cases where message being added would exceed - // batch size and/or max message size if (canAddToCurrentBatch(msg)) { - batchMessageContainer.add(msg, callback); - lastSendFuture = callback.getFuture(); - payload.release(); - if (batchMessageContainer.getNumMessagesInBatch() == maxNumMessagesInBatch - || batchMessageContainer.getCurrentBatchSize() >= BatchMessageContainerImpl.MAX_MESSAGE_BATCH_SIZE_BYTES) { - batchMessageAndSend(); + // should trigger complete the batch message, new message will add to a new batch and new batch + // sequence id use the new message, so that broker can handle the message duplication + if (sequenceId <= lastSequenceIdPushed) { + if (sequenceId <= lastSequenceIdPublished) { + log.warn("Message with sequence id {} is definitely a duplicate", sequenceId); + } else { + log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.", + sequenceId); + } + doBatchSendAndAdd(msg, callback, payload); + } else { + // handle boundary cases where message being added would exceed + // batch size and/or max message size + batchMessageContainer.add(msg, callback); + lastSendFuture = callback.getFuture(); + payload.release(); + if (batchMessageContainer.getNumMessagesInBatch() == maxNumMessagesInBatch + || batchMessageContainer.getCurrentBatchSize() >= BatchMessageContainerImpl.MAX_MESSAGE_BATCH_SIZE_BYTES) { + batchMessageAndSend(); + } } } else { doBatchSendAndAdd(msg, callback, payload); @@ -535,15 +551,21 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata, ByteBuf compressedPayload) { - ChecksumType checksumType; + return Commands.newSend(producerId, sequenceId, numMessages, getChecksumType(), msgMetadata, compressedPayload); + } + protected ByteBufPair sendMessage(long producerId, long lowestSequenceId, long highestSequenceId, int numMessages, MessageMetadata msgMetadata, + ByteBuf compressedPayload) { + return Commands.newSend(producerId, lowestSequenceId, highestSequenceId, numMessages, getChecksumType(), msgMetadata, compressedPayload); + } + + private ChecksumType getChecksumType() { if (connectionHandler.getClientCnx() == null || connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { - checksumType = ChecksumType.Crc32c; + return ChecksumType.Crc32c; } else { - checksumType = ChecksumType.None; + return ChecksumType.None; } - return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload); } private boolean canAddToBatch(MessageImpl msg) { @@ -777,8 +799,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne } return; } - - long expectedSequenceId = op.sequenceId; + long expectedSequenceId = getHighestSequenceId(op); if (sequenceId > expectedSequenceId) { log.warn("[{}] [{}] Got ack for msg. expecting: {} - got: {} - queue-size: {}", topic, producerName, expectedSequenceId, sequenceId, pendingMessages.size()); @@ -804,7 +825,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne if (callback) { op = pendingCallbacks.poll(); if (op != null) { - lastSequenceIdPublished = op.sequenceId + op.numMessagesInBatch - 1; + lastSequenceIdPublished = Math.max(lastSequenceIdPublished, getHighestSequenceId(op)); op.setMessageId(ledgerId, entryId, partitionIndex); try { // Need to protect ourselves from any exception being thrown in the future handler from the @@ -820,6 +841,9 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne } } + private long getHighestSequenceId(OpSendMsg op) { + return Math.max(op.highestSequenceId, op.sequenceId); + } private void releaseSemaphoreForSendOp(OpSendMsg op) { semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); } @@ -844,7 +868,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne log.debug("[{}] [{}] Got send failure for timed out msg {}", topic, producerName, sequenceId); } } else { - long expectedSequenceId = op.sequenceId; + long expectedSequenceId = getHighestSequenceId(op); if (sequenceId == expectedSequenceId) { boolean corrupted = !verifyLocalBufferIsNotCorrupted(op); if (corrupted) { @@ -928,6 +952,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne long createdAt; long batchSizeByte = 0; int numMessagesInBatch = 1; + long lowestSequenceId; + long highestSequenceId; static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) { OpSendMsg op = RECYCLER.get(); @@ -949,14 +975,29 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne return op; } + static OpSendMsg create(List> msgs, ByteBufPair cmd, long lowestSequenceId, + long highestSequenceId, SendCallback callback) { + OpSendMsg op = RECYCLER.get(); + op.msgs = msgs; + op.cmd = cmd; + op.callback = callback; + op.lowestSequenceId = lowestSequenceId; + op.highestSequenceId = highestSequenceId; + op.sequenceId = lowestSequenceId; + op.createdAt = System.currentTimeMillis(); + return op; + } + void recycle() { msg = null; msgs = null; cmd = null; callback = null; rePopulate = null; - sequenceId = -1; - createdAt = -1; + sequenceId = -1L; + createdAt = -1L; + lowestSequenceId = -1L; + highestSequenceId = -1L; recyclerHandle.recycle(this); } @@ -1399,6 +1440,9 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne batchMessageAndSend(); } pendingMessages.put(op); + if (op.msg != null) { + lastSequenceIdPushed = Math.max(lastSequenceIdPushed, getHighestSequenceId(op)); + } ClientCnx cnx = cnx(); if (isConnected()) { if (op.msg != null && op.msg.getSchemaState() == None) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 2074659432d..dcda3df6dbe 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -3589,6 +3589,10 @@ public final class PulsarApi { // optional uint64 txnid_most_bits = 23 [default = 0]; boolean hasTxnidMostBits(); long getTxnidMostBits(); + + // optional uint64 highest_sequence_id = 24 [default = 0]; + boolean hasHighestSequenceId(); + long getHighestSequenceId(); } public static final class MessageMetadata extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -3949,6 +3953,16 @@ public final class PulsarApi { return txnidMostBits_; } + // optional uint64 highest_sequence_id = 24 [default = 0]; + public static final int HIGHEST_SEQUENCE_ID_FIELD_NUMBER = 24; + private long highestSequenceId_; + public boolean hasHighestSequenceId() { + return ((bitField0_ & 0x00040000) == 0x00040000); + } + public long getHighestSequenceId() { + return highestSequenceId_; + } + private void initFields() { producerName_ = ""; sequenceId_ = 0L; @@ -3971,6 +3985,7 @@ public final class PulsarApi { markerType_ = 0; txnidLeastBits_ = 0L; txnidMostBits_ = 0L; + highestSequenceId_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4076,6 +4091,9 @@ public final class PulsarApi { if (((bitField0_ & 0x00020000) == 0x00020000)) { output.writeUInt64(23, txnidMostBits_); } + if (((bitField0_ & 0x00040000) == 0x00040000)) { + output.writeUInt64(24, highestSequenceId_); + } } private int memoizedSerializedSize = -1; @@ -4173,6 +4191,10 @@ public final class PulsarApi { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(23, txnidMostBits_); } + if (((bitField0_ & 0x00040000) == 0x00040000)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(24, highestSequenceId_); + } memoizedSerializedSize = size; return size; } @@ -4328,6 +4350,8 @@ public final class PulsarApi { bitField0_ = (bitField0_ & ~0x00080000); txnidMostBits_ = 0L; bitField0_ = (bitField0_ & ~0x00100000); + highestSequenceId_ = 0L; + bitField0_ = (bitField0_ & ~0x00200000); return this; } @@ -4449,6 +4473,10 @@ public final class PulsarApi { to_bitField0_ |= 0x00020000; } result.txnidMostBits_ = txnidMostBits_; + if (((from_bitField0_ & 0x00200000) == 0x00200000)) { + to_bitField0_ |= 0x00040000; + } + result.highestSequenceId_ = highestSequenceId_; result.bitField0_ = to_bitField0_; return result; } @@ -4539,6 +4567,9 @@ public final class PulsarApi { if (other.hasTxnidMostBits()) { setTxnidMostBits(other.getTxnidMostBits()); } + if (other.hasHighestSequenceId()) { + setHighestSequenceId(other.getHighestSequenceId()); + } return this; } @@ -4703,6 +4734,11 @@ public final class PulsarApi { txnidMostBits_ = input.readUInt64(); break; } + case 192: { + bitField0_ |= 0x00200000; + highestSequenceId_ = input.readUInt64(); + break; + } } } } @@ -5393,6 +5429,27 @@ public final class PulsarApi { return this; } + // optional uint64 highest_sequence_id = 24 [default = 0]; + private long highestSequenceId_ ; + public boolean hasHighestSequenceId() { + return ((bitField0_ & 0x00200000) == 0x00200000); + } + public long getHighestSequenceId() { + return highestSequenceId_; + } + public Builder setHighestSequenceId(long value) { + bitField0_ |= 0x00200000; + highestSequenceId_ = value; + + return this; + } + public Builder clearHighestSequenceId() { + bitField0_ = (bitField0_ & ~0x00200000); + highestSequenceId_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata) } @@ -15327,6 +15384,10 @@ public final class PulsarApi { // optional uint64 txnid_most_bits = 5 [default = 0]; boolean hasTxnidMostBits(); long getTxnidMostBits(); + + // optional uint64 highest_sequence_id = 6 [default = 0]; + boolean hasHighestSequenceId(); + long getHighestSequenceId(); } public static final class CommandSend extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -15413,12 +15474,23 @@ public final class PulsarApi { return txnidMostBits_; } + // optional uint64 highest_sequence_id = 6 [default = 0]; + public static final int HIGHEST_SEQUENCE_ID_FIELD_NUMBER = 6; + private long highestSequenceId_; + public boolean hasHighestSequenceId() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getHighestSequenceId() { + return highestSequenceId_; + } + private void initFields() { producerId_ = 0L; sequenceId_ = 0L; numMessages_ = 1; txnidLeastBits_ = 0L; txnidMostBits_ = 0L; + highestSequenceId_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -15460,6 +15532,9 @@ public final class PulsarApi { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeUInt64(5, txnidMostBits_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeUInt64(6, highestSequenceId_); + } } private int memoizedSerializedSize = -1; @@ -15488,6 +15563,10 @@ public final class PulsarApi { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(5, txnidMostBits_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(6, highestSequenceId_); + } memoizedSerializedSize = size; return size; } @@ -15611,6 +15690,8 @@ public final class PulsarApi { bitField0_ = (bitField0_ & ~0x00000008); txnidMostBits_ = 0L; bitField0_ = (bitField0_ & ~0x00000010); + highestSequenceId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -15664,6 +15745,10 @@ public final class PulsarApi { to_bitField0_ |= 0x00000010; } result.txnidMostBits_ = txnidMostBits_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.highestSequenceId_ = highestSequenceId_; result.bitField0_ = to_bitField0_; return result; } @@ -15685,6 +15770,9 @@ public final class PulsarApi { if (other.hasTxnidMostBits()) { setTxnidMostBits(other.getTxnidMostBits()); } + if (other.hasHighestSequenceId()) { + setHighestSequenceId(other.getHighestSequenceId()); + } return this; } @@ -15747,6 +15835,11 @@ public final class PulsarApi { txnidMostBits_ = input.readUInt64(); break; } + case 48: { + bitField0_ |= 0x00000020; + highestSequenceId_ = input.readUInt64(); + break; + } } } } @@ -15858,6 +15951,27 @@ public final class PulsarApi { return this; } + // optional uint64 highest_sequence_id = 6 [default = 0]; + private long highestSequenceId_ ; + public boolean hasHighestSequenceId() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getHighestSequenceId() { + return highestSequenceId_; + } + public Builder setHighestSequenceId(long value) { + bitField0_ |= 0x00000020; + highestSequenceId_ = value; + + return this; + } + public Builder clearHighestSequenceId() { + bitField0_ = (bitField0_ & ~0x00000020); + highestSequenceId_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSend) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index bc9ed1ca5cd..4322b3a94ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -443,6 +443,12 @@ public class Commands { return newSend(producerId, sequenceId, numMessaegs, 0, 0, checksumType, messageMetadata, payload); } + public static ByteBufPair newSend(long producerId, long lowestSequenceId, long highestSequenceId, int numMessaegs, + ChecksumType checksumType, MessageMetadata messageMetadata, ByteBuf payload) { + return newSend(producerId, lowestSequenceId, highestSequenceId, numMessaegs, 0, 0, + checksumType, messageMetadata, payload); + } + public static ByteBufPair newSend(long producerId, long sequenceId, int numMessages, long txnIdLeastBits, long txnIdMostBits, ChecksumType checksumType, MessageMetadata messageData, ByteBuf payload) { @@ -467,6 +473,31 @@ public class Commands { return res; } + public static ByteBufPair newSend(long producerId, long lowestSequenceId, long highestSequenceId, int numMessages, + long txnIdLeastBits, long txnIdMostBits, ChecksumType checksumType, + MessageMetadata messageData, ByteBuf payload) { + CommandSend.Builder sendBuilder = CommandSend.newBuilder(); + sendBuilder.setProducerId(producerId); + sendBuilder.setSequenceId(lowestSequenceId); + sendBuilder.setHighestSequenceId(highestSequenceId); + if (numMessages > 1) { + sendBuilder.setNumMessages(numMessages); + } + if (txnIdLeastBits > 0) { + sendBuilder.setTxnidLeastBits(txnIdLeastBits); + } + if (txnIdMostBits > 0) { + sendBuilder.setTxnidMostBits(txnIdMostBits); + } + CommandSend send = sendBuilder.build(); + + ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send), + checksumType, messageData, payload); + send.recycle(); + sendBuilder.recycle(); + return res; + } + public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 66cb7b58661..f9c88b5f8a0 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -132,6 +132,9 @@ message MessageMetadata { // transaction related message info optional uint64 txnid_least_bits = 22 [default = 0]; optional uint64 txnid_most_bits = 23 [default = 0]; + + /// Add highest sequence id to support batch message with external sequence id + optional uint64 highest_sequence_id = 24 [default = 0]; } message SingleMessageMetadata { @@ -411,6 +414,9 @@ message CommandSend { optional int32 num_messages = 3 [default = 1]; optional uint64 txnid_least_bits = 4 [default = 0]; optional uint64 txnid_most_bits = 5 [default = 0]; + + /// Add highest sequence id to support batch message with external sequence id + optional uint64 highest_sequence_id = 6 [default = 0]; } message CommandSendReceipt { -- GitLab