提交 b68cc80f 编写于 作者: I Ivan Kelly 提交者: Matteo Merli

Read compacted consumer flag (#1136)

* Read compacted consumer flag

When enabled, a consumer will read messages from a compacted topic
ledger if available. When disabled, the consumer will read from the
message backlog as before, even if the topic has been compacted.

The logic for actually reading from the compacted topic will be
submitted in following patch.

* readcompacted only works on non-shared subs on persistent topics

Document this in the javadoc and add server side validation

* Add client side validation

Of course these occlude the server side validations, so they can't be
tested without doing something nasty.
上级 13da9352
......@@ -69,6 +69,7 @@ public class Consumer {
private final long consumerId;
private final int priorityLevel;
private final boolean readCompacted;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
......@@ -98,14 +99,17 @@ public class Consumer {
private final Map<String, String> metadata;
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId, Map<String, String> metadata) throws BrokerServiceException {
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted) throws BrokerServiceException {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
......@@ -145,6 +149,10 @@ public class Consumer {
return consumerName;
}
public boolean readCompacted() {
return readCompacted;
}
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries object.</b>
......
......@@ -483,6 +483,7 @@ public class ServerCnx extends PulsarHandler {
: null;
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
authorizationFuture.thenApply(isAuthorized -> {
......@@ -528,7 +529,8 @@ public class ServerCnx extends PulsarHandler {
service.getTopic(topicName)
.thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
......
......@@ -79,7 +79,7 @@ public interface Topic {
CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata);
Map<String, String> metadata, boolean readCompacted);
CompletableFuture<Subscription> createSubscription(String subscriptionName);
......
......@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
......@@ -297,7 +298,7 @@ public class NonPersistentTopic implements Topic {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {
Map<String, String> metadata, boolean readCompacted) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
......@@ -315,6 +316,11 @@ public class NonPersistentTopic implements Topic {
return future;
}
if (readCompacted) {
future.completeExceptionally(new NotAllowedException("readCompacted only valid on persistent topics"));
return future;
}
lock.readLock().lock();
try {
if (isFenced) {
......@@ -336,7 +342,7 @@ public class NonPersistentTopic implements Topic {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata);
cnx.getRole(), metadata, readCompacted);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
......
......@@ -56,6 +56,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
......@@ -396,10 +397,15 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {
Map<String, String> metadata, boolean readCompacted) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
future.completeExceptionally(
new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
return future;
}
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
......@@ -447,7 +453,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole(), metadata);
maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
......
......@@ -209,7 +209,8 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
......@@ -226,7 +227,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap());
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
......@@ -234,7 +235,8 @@ public class PersistentDispatcherFailoverConsumerTest {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */);
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
......@@ -443,7 +445,7 @@ public class PersistentDispatcherFailoverConsumerTest {
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer =
new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap());
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */);
try {
consumer.flowPermits(permit);
} catch (Exception e) {
......
......@@ -120,7 +120,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -178,7 +178,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -240,7 +240,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -298,7 +298,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......
......@@ -363,7 +363,7 @@ public class PersistentTopicTest {
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
try {
f1.get();
fail("should fail with exception");
......@@ -382,12 +382,12 @@ public class PersistentTopicTest {
// 1. simple subscribe
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
try {
f2.get();
......@@ -411,7 +411,7 @@ public class PersistentTopicTest {
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap());
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
......@@ -441,7 +441,7 @@ public class PersistentTopicTest {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap());
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
sub.addConsumer(consumer1);
doAnswer(new Answer<Object>() {
......@@ -463,7 +463,7 @@ public class PersistentTopicTest {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap());
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
}
......@@ -493,7 +493,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */);
f1.get();
assertTrue(topic.delete().isCompletedExceptionally());
......@@ -508,7 +508,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -562,7 +562,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -649,7 +649,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
try {
f.get();
......@@ -760,7 +760,8 @@ public class PersistentTopicTest {
// 1. Subscribe with non partition topic
Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap());
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
cmd1.getReadCompacted());
f1.get();
// 2. Subscribe with partition topic
......@@ -771,7 +772,8 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap());
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
cmd2.getReadCompacted());
f2.get();
// 3. Subscribe and create second consumer
......@@ -780,7 +782,8 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap());
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
cmd3.getReadCompacted());
f3.get();
assertEquals(
......@@ -800,7 +803,8 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap());
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
cmd4.getReadCompacted());
f4.get();
assertEquals(
......@@ -825,7 +829,8 @@ public class PersistentTopicTest {
.setSubType(SubType.Exclusive).build();
Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap());
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
cmd5.getReadCompacted());
try {
f5.get();
......@@ -841,7 +846,8 @@ public class PersistentTopicTest {
.setSubType(SubType.Exclusive).build();
Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap());
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
cmd6.getReadCompacted());
f6.get();
// 7. unsubscribe exclusive sub
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ConsumerConfigurationTest extends MockedPulsarServiceBaseTest {
private static final Logger log = LoggerFactory.getLogger(ConsumerConfigurationTest.class);
private static String persistentTopic = "persistent://my-property/use/my-ns/persist";
private static String nonPersistentTopic = "non-persistent://my-property/use/my-ns/nopersist";
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("use",
new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
}
@AfterMethod
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testReadCompactPersistentExclusive() throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration().setReadCompacted(true);
conf.setSubscriptionType(SubscriptionType.Exclusive);
pulsarClient.subscribe(persistentTopic, "sub1", conf).close();
}
@Test
public void testReadCompactPersistentFailover() throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration().setReadCompacted(true);
conf.setSubscriptionType(SubscriptionType.Failover);
pulsarClient.subscribe(persistentTopic, "sub1", conf).close();
}
@Test(expectedExceptions=InvalidConfigurationException.class)
public void testReadCompactPersistentShared() throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration().setReadCompacted(true);
conf.setSubscriptionType(SubscriptionType.Shared);
pulsarClient.subscribe(persistentTopic, "sub1", conf).close();
}
@Test(expectedExceptions=InvalidConfigurationException.class)
public void testReadCompactNonPersistentExclusive() throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration().setReadCompacted(true);
conf.setSubscriptionType(SubscriptionType.Exclusive);
pulsarClient.subscribe(nonPersistentTopic, "sub1", conf).close();
}
}
......@@ -61,6 +61,8 @@ public class ConsumerConfiguration implements Serializable {
private final Map<String, String> properties = new HashMap<>();
private boolean readCompacted = false;
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
......@@ -270,6 +272,27 @@ public class ConsumerConfiguration implements Serializable {
this.priorityLevel = priorityLevel;
}
public boolean getReadCompacted() {
return readCompacted;
}
/**
* If enabled, the consumer will read messages from the compacted topic rather than reading the full message
* backlog of the topic. This means that, if the topic has been compacted, the consumer will only see the latest
* value for each key in the topic, up until the point in the topic message backlog that has been compacted.
* Beyond that point, the messages will be sent as normal.
*
* readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer
* (i.e. failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent
* topics or on a shared subscription, will lead to the subscription call throwing a PulsarClientException.
*
* @param readCompacted whether to read from the compacted topic
*/
public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
return this;
}
/**
* Set a name/value property with this consumer.
* @param key
......
......@@ -106,6 +106,8 @@ public class ConsumerImpl extends ConsumerBase {
private final Map<String, String> metadata;
private final boolean readCompacted;
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
......@@ -135,6 +137,8 @@ public class ConsumerImpl extends ConsumerBase {
this.codecProvider = new CompressionCodecProvider();
this.priorityLevel = conf.getPriorityLevel();
this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
this.readCompacted = conf.getReadCompacted();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStats(client, conf, this);
} else {
......@@ -550,7 +554,7 @@ public class ConsumerImpl extends ConsumerBase {
}
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
consumerName, isDurable, startMessageIdData, metadata);
consumerName, isDurable, startMessageIdData, metadata, readCompacted);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
......
......@@ -40,8 +40,10 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
......@@ -240,6 +242,14 @@ public class PulsarClientImpl implements PulsarClient {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
if (conf.getReadCompacted()
&& (!DestinationName.get(topic).getDomain().equals(DestinationDomain.persistent)
|| (conf.getSubscriptionType() != SubscriptionType.Exclusive
&& conf.getSubscriptionType() != SubscriptionType.Failover))) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException(
"Read compacted can only be used with exclusive of failover persistent subscriptions"));
}
CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
......
......@@ -312,12 +312,12 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
true /* isDurable */, null /* startMessageId */, Collections.emptyMap());
true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false);
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata) {
Map<String, String> metadata, boolean readCompacted) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
......@@ -327,6 +327,7 @@ public class Commands {
subscribeBuilder.setRequestId(requestId);
subscribeBuilder.setPriorityLevel(priorityLevel);
subscribeBuilder.setDurable(isDurable);
subscribeBuilder.setReadCompacted(readCompacted);
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
......
......@@ -5638,6 +5638,10 @@ public final class PulsarApi {
getMetadataList();
org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index);
int getMetadataCount();
// optional bool read_compacted = 11;
boolean hasReadCompacted();
boolean getReadCompacted();
}
public static final class CommandSubscribe extends
com.google.protobuf.GeneratedMessageLite
......@@ -5897,6 +5901,16 @@ public final class PulsarApi {
return metadata_.get(index);
}
// optional bool read_compacted = 11;
public static final int READ_COMPACTED_FIELD_NUMBER = 11;
private boolean readCompacted_;
public boolean hasReadCompacted() {
return ((bitField0_ & 0x00000200) == 0x00000200);
}
public boolean getReadCompacted() {
return readCompacted_;
}
private void initFields() {
topic_ = "";
subscription_ = "";
......@@ -5908,6 +5922,7 @@ public final class PulsarApi {
durable_ = true;
startMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
metadata_ = java.util.Collections.emptyList();
readCompacted_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
......@@ -5988,6 +6003,9 @@ public final class PulsarApi {
for (int i = 0; i < metadata_.size(); i++) {
output.writeMessage(10, metadata_.get(i));
}
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeBool(11, readCompacted_);
}
}
private int memoizedSerializedSize = -1;
......@@ -6036,6 +6054,10 @@ public final class PulsarApi {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, metadata_.get(i));
}
if (((bitField0_ & 0x00000200) == 0x00000200)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(11, readCompacted_);
}
memoizedSerializedSize = size;
return size;
}
......@@ -6169,6 +6191,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000100);
metadata_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000200);
readCompacted_ = false;
bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
......@@ -6243,6 +6267,10 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000200);
}
result.metadata_ = metadata_;
if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
to_bitField0_ |= 0x00000200;
}
result.readCompacted_ = readCompacted_;
result.bitField0_ = to_bitField0_;
return result;
}
......@@ -6286,6 +6314,9 @@ public final class PulsarApi {
}
}
if (other.hasReadCompacted()) {
setReadCompacted(other.getReadCompacted());
}
return this;
}
......@@ -6407,6 +6438,11 @@ public final class PulsarApi {
addMetadata(subBuilder.buildPartial());
break;
}
case 88: {
bitField0_ |= 0x00000400;
readCompacted_ = input.readBool();
break;
}
}
}
}
......@@ -6761,6 +6797,27 @@ public final class PulsarApi {
return this;
}
// optional bool read_compacted = 11;
private boolean readCompacted_ ;
public boolean hasReadCompacted() {
return ((bitField0_ & 0x00000400) == 0x00000400);
}
public boolean getReadCompacted() {
return readCompacted_;
}
public Builder setReadCompacted(boolean value) {
bitField0_ |= 0x00000400;
readCompacted_ = value;
return this;
}
public Builder clearReadCompacted() {
bitField0_ = (bitField0_ & ~0x00000400);
readCompacted_ = false;
return this;
}
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
}
......
......@@ -183,6 +183,8 @@ message CommandSubscribe {
/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;
optional bool read_compacted = 11;
}
message CommandPartitionedTopicMetadata {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册