提交 f740f34a 编写于 作者: M Matteo Merli 提交者: GitHub

Position Reader on a specific message within a batch (#720)

上级 a9cca056
......@@ -36,7 +36,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);
if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
// both ledgerId and entryId to be Long.max()
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
......
......@@ -32,8 +32,8 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
private final long ledgerId;
private final long entryId;
public static Position earliest = new PositionImpl(-1, -1);
public static Position latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
public static PositionImpl earliest = new PositionImpl(-1, -1);
public static PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
......
......@@ -19,6 +19,11 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.PersistentTopics.getPartitionedTopicMetadata;
import static org.apache.pulsar.broker.lookup.DestinationLookup.lookupDestinationAsync;
import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
......@@ -28,15 +33,11 @@ import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.PersistentTopics.getPartitionedTopicMetadata;
import static org.apache.pulsar.broker.lookup.DestinationLookup.lookupDestinationAsync;
import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
......@@ -53,16 +54,15 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -345,8 +345,9 @@ public class ServerCnx extends PulsarHandler {
final String consumerName = subscribe.getConsumerName();
final boolean isDurable = subscribe.getDurable();
final MessageIdImpl startMessageId = subscribe.hasStartMessageId()
? new MessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition())
? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(),
subscribe.getStartMessageId().getBatchIndex())
: null;
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
......
......@@ -73,6 +73,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.util.FutureUtil;
......@@ -191,9 +192,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
USAGE_COUNT_UPDATER.set(this, 0);
this.dispatchRateLimiter = new DispatchRateLimiter(this);
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
......@@ -465,13 +466,24 @@ public class PersistentTopic implements Topic, AddEntryCallback {
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName, MessageId startMessageId) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
// Create a new non-durable cursor only for the first consumer that connects
Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
// Create a new non-durable cursor only for the first consumer that connects
MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
: (MessageIdImpl) MessageId.latest;
Position startPosition = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages in the batch.
if (((BatchMessageIdImpl)msgId).getBatchIndex() >= 0) {
entryId = msgId.getEntryId() - 1;
}
}
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition);
......@@ -665,9 +677,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
closeFuture.complete(null);
}
}, null);
dispatchRateLimiter.close();
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
......@@ -1415,6 +1427,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public DispatchRateLimiter getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}
private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
......@@ -48,17 +48,6 @@ import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
......@@ -242,10 +231,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", producerConf);
// Produce messages
CompletableFuture<MessageId> lastFuture = null;
for (int i = 0; i < 10; i++) {
producer.send("my-message".getBytes());
lastFuture = producer.sendAsync(("my-message-" + i).getBytes()).thenApply(msgId -> {
log.info("Published message id: {}", msgId);
return msgId;
});
}
lastFuture.get();
Message msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
......@@ -594,7 +589,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
/**
* Verifies non-batch message size being validated after performing compression while batch-messaging validates
* before compression of message
*
*
* <pre>
* send msg with size > MAX_SIZE (5 MB)
* a. non-batch with compression: pass
......@@ -602,7 +597,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* c. non-batch w/o compression: fail
* d. non-batch with compression, consumer consume: pass
* </pre>
*
*
* @throws Exception
*/
@Test
......@@ -1333,7 +1328,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -1867,7 +1862,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testPriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -1909,10 +1904,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
/**
* a. consumer1 and consumer2 now has more permits (as received and sent more permits)
* b. try to produce more messages: which will again distribute among consumer1 and consumer2
* a. consumer1 and consumer2 now has more permits (as received and sent more permits)
* b. try to produce more messages: which will again distribute among consumer1 and consumer2
* and should not dispatch to consumer4
*
*
*/
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
......@@ -1935,12 +1930,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* <pre>
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked
* and unblocked consumers.
*
* 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
*
* 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
* Out of which : c1,c2,c4,c5 will be blocked due to MaxUnackedMessages limit.
* 2. So, dispatcher should moves round-robin and make sure it delivers unblocked consumer : c3
* </pre>
*
*
* @throws Exception
*/
@Test(timeOut=5000)
......@@ -2129,7 +2124,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 5000)
public void testFailReceiveAsyncOnConsumerClose() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -2163,5 +2158,5 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
}
\ No newline at end of file
......@@ -25,12 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -245,4 +240,47 @@ public class TopicReaderTest extends ProducerConsumerBase {
producer.close();
}
@Test
public void testReaderOnSpecificMessageWithBatches() throws Exception {
ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", producerConf);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}
// Write one sync message to ensure everything before got persistend
producer.send("my-message-10".getBytes());
Reader reader1 = pulsarClient.createReader(
"persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", MessageId.earliest,
new ReaderConfiguration());
MessageId lastMessageId = null;
for (int i = 0; i < 5; i++) {
Message msg = reader1.readNext();
lastMessageId = msg.getMessageId();
}
assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
Reader reader2 = pulsarClient.createReader(
"persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", lastMessageId,
new ReaderConfiguration());
for (int i = 5; i < 11; i++) {
Message msg = reader2.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
assertEquals(receivedMessage, expectedMessage);
}
producer.close();
}
}
\ No newline at end of file
......@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;
/**
......@@ -31,7 +30,16 @@ public class BatchMessageIdImpl extends MessageIdImpl implements Comparable<Mess
this.batchIndex = batchIndex;
}
int getBatchIndex() {
public BatchMessageIdImpl(MessageIdImpl other) {
super(other.ledgerId, other.entryId, other.partitionIndex);
if (other instanceof BatchMessageIdImpl) {
this.batchIndex = ((BatchMessageIdImpl) other).batchIndex;
} else {
this.batchIndex = -1;
}
}
public int getBatchIndex() {
return batchIndex;
}
......
......@@ -51,12 +51,12 @@ import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.slf4j.Logger;
......@@ -100,7 +100,7 @@ public class ConsumerImpl extends ConsumerBase {
protected final ConsumerStats stats;
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private final MessageId startMessageId;
private BatchMessageIdImpl startMessageId;
private volatile boolean hasReachedEndOfTopic;
......@@ -125,7 +125,7 @@ public class ConsumerImpl extends ConsumerBase {
super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.startMessageId = startMessageId;
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
this.partitionIndex = partitionIndex;
......@@ -359,7 +359,7 @@ public class ConsumerImpl extends ConsumerBase {
} finally {
lock.writeLock().unlock();
}
// all messages in this batch have been acked
if (isAllMsgsAcked) {
if (log.isDebugEnabled()) {
......@@ -500,7 +500,6 @@ public class ConsumerImpl extends ConsumerBase {
long requestId = client.newRequestId();
int currentSize;
MessageIdImpl startMessageId;
synchronized (this) {
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
......@@ -518,6 +517,10 @@ public class ConsumerImpl extends ConsumerBase {
MessageIdData.Builder builder = MessageIdData.newBuilder();
builder.setLedgerId(startMessageId.getLedgerId());
builder.setEntryId(startMessageId.getEntryId());
if (startMessageId instanceof BatchMessageIdImpl) {
builder.setBatchIndex(((BatchMessageIdImpl) startMessageId).getBatchIndex());
}
startMessageIdData = builder.build();
builder.recycle();
}
......@@ -593,21 +596,31 @@ public class ConsumerImpl extends ConsumerBase {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application
*/
private MessageIdImpl clearReceiverQueue() {
private BatchMessageIdImpl clearReceiverQueue() {
List<Message> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
if (!currentMessageQueue.isEmpty()) {
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
MessageIdImpl previousMessage = new MessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex());
BatchMessageIdImpl previousMessage;
if (nextMessageInQueue instanceof BatchMessageIdImpl) {
// Get on the previous message within the current batch
previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId(), nextMessageInQueue.getPartitionIndex(),
((BatchMessageIdImpl) nextMessageInQueue).getBatchIndex() - 1);
} else {
// Get on previous message in previous entry
previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex(), -1);
}
return previousMessage;
} else if (lastDequeuedMessage != null) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return lastDequeuedMessage;
return new BatchMessageIdImpl(lastDequeuedMessage);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return (MessageIdImpl) startMessageId;
return startMessageId;
}
}
......@@ -850,6 +863,8 @@ public class ConsumerImpl extends ConsumerBase {
}
batchMessageAckTracker.put(batchMessage, bitSet);
unAckedMessageTracker.add(batchMessage);
int skippedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
if (log.isDebugEnabled()) {
......@@ -859,6 +874,21 @@ public class ConsumerImpl extends ConsumerBase {
.newBuilder();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);
if (subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId()
&& i <= startMessageId.getBatchIndex()) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription, consumerName);
}
++skippedMessages;
continue;
}
BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i);
final MessageImpl message = new MessageImpl(batchMessageIdImpl, msgMetadata,
......@@ -883,6 +913,10 @@ public class ConsumerImpl extends ConsumerBase {
log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription,
consumerName, incomingMessages.size(), incomingMessages.remainingCapacity());
}
if (skippedMessages > 0) {
increaseAvailablePermits(cnx, skippedMessages);
}
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册