提交 3d9ae3f4 编写于 作者: I Ivan Kelly 提交者: Matteo Merli

Acknowledgement with properties for RawReader (#1006)

* Acknowledgement with properties for RawReader

This will be used by the compactor to update the topic with the
compacted ledger. There will be a cursor/subscription owned by the
compactor which keeps track of where we have compacted until, and the
ID of the ledger into which the compacted data has been written.

This change exposes the cursor's String->Long properties map which is
already available for Producer sequence tracking. This is only exposed
through the RawReader, which is part of the broker module. Clients
will not be able to use it.

* Review comments from @sijie

* Updated protobuf for recycler
上级 1fe28e74
......@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.api.Commands.readChecksum;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
......@@ -251,7 +254,7 @@ public class Consumer {
iter.remove();
PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(pos, AckType.Individual);
subscription.acknowledgeMessage(pos, AckType.Individual, Collections.emptyMap());
continue;
}
if (pendingAcks != null) {
......@@ -334,15 +337,21 @@ public class Consumer {
position, ack.getValidationError());
}
Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
.collect(Collectors.toMap((e) -> e.getKey(),
(e) -> e.getValue()));
}
if (subType == SubType.Shared) {
// On shared subscriptions, cumulative ack is not supported
checkArgument(ack.getAckType() == AckType.Individual);
// Only ack a single message
removePendingAcks(position);
subscription.acknowledgeMessage(position, AckType.Individual);
subscription.acknowledgeMessage(position, AckType.Individual, properties);
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
subscription.acknowledgeMessage(position, ack.getAckType(), properties);
}
}
......
......@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
......@@ -39,7 +40,7 @@ public interface Subscription {
void consumerFlow(Consumer consumer, int additionalNumberOfMessages);
void acknowledgeMessage(PositionImpl position, AckType ackType);
void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties);
String getDestination();
......
......@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.nonpersistent;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
......@@ -138,7 +139,7 @@ public class NonPersistentSubscription implements Subscription {
}
@Override
public void acknowledgeMessage(PositionImpl position, AckType ackType) {
public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
// No-op
}
......
......@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
......@@ -175,12 +176,12 @@ public class PersistentSubscription implements Subscription {
}
@Override
public void acknowledgeMessage(PositionImpl position, AckType ackType) {
public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
if (ackType == AckType.Cumulative) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
}
cursor.asyncMarkDelete(position, markDeleteCallback, position);
cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual ack on {}", topicName, subName, position);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
......@@ -30,9 +31,9 @@ public interface RawReader {
/**
* Create a raw reader for a topic.
*/
public static CompletableFuture<RawReader> create(PulsarClient client, String topic) {
public static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, future);
RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, subscription, future);
return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
}
......@@ -49,6 +50,16 @@ public interface RawReader {
*/
CompletableFuture<RawMessage> readNextAsync();
/**
* Acknowledge all messages as read up until <i>messageId</i>. The properties are stored
* with the individual acknowledgement, so later acknowledgements will overwrite all
* properties from previous acknowledgements.
*
* @param messageId to cumulatively acknowledge to
* @param properties a map of properties which will be stored with the acknowledgement
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties);
/**
* Close the raw reader.
*/
......
......@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
......@@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
......@@ -52,12 +54,12 @@ public class RawReaderImpl implements RawReader {
private final ConsumerConfiguration consumerConfiguration;
private RawConsumerImpl consumer;
public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Consumer> consumerFuture) {
public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer> consumerFuture) {
this.client = client;
this.subscription = subscription;
this.topic = topic;
subscription = "raw-reader";
consumerConfiguration = new ConsumerConfiguration();
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
......@@ -76,6 +78,11 @@ public class RawReaderImpl implements RawReader {
return consumer.receiveRawAsync();
}
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties) {
return consumer.doAcknowledge(messageId, AckType.Cumulative, properties);
}
@Override
public CompletableFuture<Void> closeAsync() {
return consumer.closeAsync();
......
......@@ -37,6 +37,7 @@ import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
......@@ -1175,7 +1176,7 @@ public class ServerCnxTest {
PositionImpl pos = new PositionImpl(0, 0);
clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), AckType.Individual,
null);
null, Collections.emptyMap());
channel.writeInbound(clientCommand);
// verify nothing is sent out on the wire after ack
......
......@@ -26,13 +26,19 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
......@@ -41,7 +47,6 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
......@@ -54,6 +59,7 @@ import org.testng.annotations.Test;
public class RawReaderTest extends MockedPulsarServiceBaseTest {
private static final Logger log = LoggerFactory.getLogger(RawReaderTest.class);
private static final String subscription = "foobar-sub";
@BeforeMethod
@Override
......@@ -104,7 +110,7 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
Set<String> keys = publishMessages(topic, numKeys);
RawReader reader = RawReader.create(pulsarClient, topic).get();
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
try {
while (true) { // should break out with TimeoutException
try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
......@@ -126,7 +132,7 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
publishMessages(topic, numKeys);
Set<String> readKeys = new HashSet<>();
RawReader reader = RawReader.create(pulsarClient, topic).get();
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
try {
while (true) { // should break out with TimeoutException
try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
......@@ -161,7 +167,7 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
publishMessages(topic, numKeys);
Set<String> readKeys = new HashSet<>();
RawReader reader = RawReader.create(pulsarClient, topic).get();
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
int i = 0;
MessageId seekTo = null;
try {
......@@ -206,7 +212,7 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
publishMessages(topic, numMessages);
RawReader reader = RawReader.create(pulsarClient, topic).get();
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
List<Future<RawMessage>> futures = new ArrayList<>();
Set<String> keys = new HashSet<>();
......@@ -226,4 +232,44 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(timeouts, 1);
Assert.assertEquals(keys.size(), numMessages);
}
@Test
public void testAcknowledgeWithProperties() throws Exception {
int numKeys = 10;
String topic = "persistent://my-property/use/my-ns/my-raw-topic";
Set<String> keys = publishMessages(topic, numKeys);
MessageId lastMessageId = null;
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
try {
while (true) { // should break out with TimeoutException
try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
lastMessageId = m.getMessageId();
Assert.assertTrue(keys.remove(extractKey(m)));
}
}
} catch (TimeoutException te) {
// ok
}
Assert.assertTrue(keys.isEmpty());
Map<String,Long> properties = new HashMap<>();
properties.put("foobar", 0xdeadbeefdecaL);
reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(5, TimeUnit.SECONDS);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
ManagedLedger ledger = topicRef.getManagedLedger();
for (int i = 0; i < 30; i++) {
if (ledger.openCursor(subscription).getProperties().get("foobar") == Long.valueOf(0xdeadbeefdecaL)) {
break;
}
Thread.sleep(100);
}
Assert.assertEquals(ledger.openCursor(subscription).getProperties().get("foobar"),
Long.valueOf(0xdeadbeefdecaL));
}
}
......@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.impl;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
......@@ -225,7 +227,7 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
@Override
public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
return doAcknowledge(messageId, AckType.Individual);
return doAcknowledge(messageId, AckType.Individual, Collections.emptyMap());
}
@Override
......@@ -235,10 +237,11 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
"Cannot use cumulative acks on a non-exclusive subscription"));
}
return doAcknowledge(messageId, AckType.Cumulative);
return doAcknowledge(messageId, AckType.Cumulative, Collections.emptyMap());
}
abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType);
abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties);
@Override
public void unsubscribe() throws PulsarClientException {
......
......@@ -28,7 +28,9 @@ import static org.apache.pulsar.common.api.Commands.readChecksum;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
......@@ -322,7 +324,8 @@ public class ConsumerImpl extends ConsumerBase {
// we may not be able to ack message being acked by client. However messages in prior
// batch may be ackable
private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message) {
private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message,
Map<String,Long> properties) {
// get entry before this message and ack that message on broker
MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
if (lowerKey != null) {
......@@ -334,7 +337,7 @@ public class ConsumerImpl extends ConsumerBase {
log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", subscription,
consumerId, lowerKey, batchMessageId);
}
sendAcknowledge(lowerKey, AckType.Cumulative);
sendAcknowledge(lowerKey, AckType.Cumulative, properties);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] no messages prior to message {}", subscription, consumerId, batchMessageId);
......@@ -342,7 +345,8 @@ public class ConsumerImpl extends ConsumerBase {
}
}
boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType) {
boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType,
Map<String,Long> properties) {
// we keep track of entire batch and so need MessageIdImpl and cannot use BatchMessageIdImpl
MessageIdImpl message = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
......@@ -396,7 +400,7 @@ public class ConsumerImpl extends ConsumerBase {
} else {
// we cannot ack this message to broker. but prior message may be ackable
if (ackType == AckType.Cumulative) {
ackMessagesInEarlierBatch(batchMessageId, message);
ackMessagesInEarlierBatch(batchMessageId, message, properties);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription,
......@@ -439,7 +443,8 @@ public class ConsumerImpl extends ConsumerBase {
}
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType) {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) {
checkArgument(messageId instanceof MessageIdImpl);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
......@@ -447,7 +452,7 @@ public class ConsumerImpl extends ConsumerBase {
}
if (messageId instanceof BatchMessageIdImpl) {
if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType)) {
if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) {
// all messages in batch have been acked so broker can be acked via sendAcknowledge()
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] acknowledging message - {}, acktype {}", subscription, consumerName, messageId,
......@@ -463,12 +468,14 @@ public class ConsumerImpl extends ConsumerBase {
if (ackType == AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) {
updateBatchAckTracker((MessageIdImpl) messageId, ackType);
}
return sendAcknowledge(messageId, ackType);
return sendAcknowledge(messageId, ackType, properties);
}
private CompletableFuture<Void> sendAcknowledge(MessageId messageId, AckType ackType) {
private CompletableFuture<Void> sendAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), ackType, null);
final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(),
ackType, null, properties);
// There's no actual response from ack messages
final CompletableFuture<Void> ackFuture = new CompletableFuture<Void>();
......@@ -1094,7 +1101,7 @@ public class ConsumerImpl extends ConsumerBase {
private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), AckType.Individual,
validationError);
validationError, Collections.emptyMap());
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
increaseAvailablePermits(currentCnx);
stats.incrementNumReceiveFailed();
......
......@@ -236,7 +236,8 @@ public class PartitionedConsumerImpl extends ConsumerBase {
}
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType) {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) {
checkArgument(messageId instanceof MessageIdImpl);
if (getState() != State.Ready) {
......@@ -249,7 +250,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
} else {
ConsumerImpl consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex());
return consumer.doAcknowledge(messageId, ackType).thenRun(() ->
return consumer.doAcknowledge(messageId, ackType, properties).thenRun(() ->
unAckedMessageTracker.remove((MessageIdImpl) messageId));
}
......
......@@ -23,6 +23,7 @@ import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
......@@ -517,7 +518,7 @@ public class Commands {
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType,
ValidationError validationError) {
ValidationError validationError, Map<String,Long> properties) {
CommandAck.Builder ackBuilder = CommandAck.newBuilder();
ackBuilder.setConsumerId(consumerId);
ackBuilder.setAckType(ackType);
......@@ -529,6 +530,10 @@ public class Commands {
if (validationError != null) {
ackBuilder.setValidationError(validationError);
}
for (Map.Entry<String,Long> e : properties.entrySet()) {
ackBuilder.addProperties(
PulsarApi.KeyLongValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
}
CommandAck ack = ackBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
......
......@@ -34,6 +34,11 @@ message KeyValue {
required string value = 2;
}
message KeyLongValue {
required string key = 1;
required uint64 value = 2;
}
message EncryptionKeys {
required string key = 1;
required bytes value = 2;
......@@ -280,6 +285,7 @@ message CommandAck {
}
optional ValidationError validation_error = 4;
repeated KeyLongValue properties = 5;
}
message CommandFlow {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册