提交 6bd11382 编写于 作者: C cckellogg 提交者: Matteo Merli

Add optional key/value metadata to consumers. (#1031)

上级 5263e64d
......@@ -96,8 +96,10 @@ public class Consumer {
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;
private final Map<String, String> metadata;
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {
int maxUnackedMessages, ServerCnx cnx, String appId, Map<String, String> metadata) throws BrokerServiceException {
this.subscription = subscription;
this.subType = subType;
......@@ -114,11 +116,14 @@ public class Consumer {
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
this.metadata = metadata != null ? metadata : Collections.emptyMap();
stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DateFormatter.now();
stats.clientVersion = cnx.getClientVersion();
stats.metadata = this.metadata;
if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
......
......@@ -367,6 +367,7 @@ public class ServerCnx extends PulsarHandler {
: null;
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
......@@ -376,6 +377,14 @@ public class ServerCnx extends PulsarHandler {
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
return null;
}
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
......@@ -400,7 +409,7 @@ public class ServerCnx extends PulsarHandler {
}
service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId))
consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
......@@ -77,7 +78,8 @@ public interface Topic {
void removeProducer(Producer producer);
CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId);
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata);
CompletableFuture<Subscription> createSubscription(String subscriptionName);
......
......@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
......@@ -295,7 +296,8 @@ public class NonPersistentTopic implements Topic {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) {
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
......@@ -334,7 +336,7 @@ public class NonPersistentTopic implements Topic {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole());
cnx.getRole(), metadata);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
......
......@@ -25,10 +25,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -385,7 +382,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) {
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
......@@ -436,7 +434,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole());
maxUnackedMessages, cnx, cnx.getRole(), metadata);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
......
......@@ -34,6 +34,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
......@@ -208,7 +209,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1");
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
......@@ -225,7 +226,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
......@@ -233,7 +234,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1");
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap());
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
......@@ -440,7 +441,9 @@ public class PersistentDispatcherFailoverConsumerTest {
}
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer = new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000, serverCnx, "appId");
Consumer consumer =
new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap());
try {
consumer.flowPermits(permit);
} catch (Exception e) {
......
......@@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
......@@ -119,7 +120,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -177,7 +178,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -239,7 +240,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -297,7 +298,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......
......@@ -42,6 +42,7 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
......@@ -362,7 +363,7 @@ public class PersistentTopicTest {
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
try {
f1.get();
fail("should fail with exception");
......@@ -381,12 +382,12 @@ public class PersistentTopicTest {
// 1. simple subscribe
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
try {
f2.get();
......@@ -410,7 +411,7 @@ public class PersistentTopicTest {
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
50000, serverCnx, "myrole-1", Collections.emptyMap());
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
......@@ -440,7 +441,7 @@ public class PersistentTopicTest {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
50000, serverCnx, "myrole-1", Collections.emptyMap());
sub.addConsumer(consumer1);
doAnswer(new Answer<Object>() {
......@@ -462,7 +463,7 @@ public class PersistentTopicTest {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
50000, serverCnx, "myrole-1", Collections.emptyMap());
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
}
......@@ -492,7 +493,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
assertTrue(topic.delete().isCompletedExceptionally());
......@@ -507,7 +508,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -561,7 +562,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
......@@ -648,7 +649,7 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap());
try {
f.get();
......@@ -759,7 +760,7 @@ public class PersistentTopicTest {
// 1. Subscribe with non partition topic
Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null);
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap());
f1.get();
// 2. Subscribe with partition topic
......@@ -770,7 +771,7 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null);
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap());
f2.get();
// 3. Subscribe and create second consumer
......@@ -779,7 +780,7 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null);
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap());
f3.get();
assertEquals(
......@@ -799,7 +800,7 @@ public class PersistentTopicTest {
.setSubType(SubType.Failover).build();
Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null);
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap());
f4.get();
assertEquals(
......@@ -824,7 +825,7 @@ public class PersistentTopicTest {
.setSubType(SubType.Exclusive).build();
Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null);
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap());
try {
f5.get();
......@@ -840,7 +841,7 @@ public class PersistentTopicTest {
.setSubType(SubType.Exclusive).build();
Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null);
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap());
f6.get();
// 7. unsubscribe exclusive sub
......
......@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
......@@ -55,6 +57,8 @@ public class ConsumerConfiguration implements Serializable {
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
private final Map<String, String> properties = new HashMap<>();
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
......@@ -242,4 +246,33 @@ public class ConsumerConfiguration implements Serializable {
public void setPriorityLevel(int priorityLevel) {
this.priorityLevel = priorityLevel;
}
/**
* Set a name/value property with this consumer.
* @param key
* @param value
* @return
*/
public ConsumerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
properties.put(key, value);
return this;
}
/**
* Add all the properties in the provided map
* @param properties
* @return
*/
public ConsumerConfiguration setProperties(Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
return this;
}
public Map<String, String> getProperties() {
return properties;
}
}
......@@ -26,14 +26,7 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
......@@ -111,7 +104,9 @@ public class ConsumerImpl extends ConsumerBase {
private MessageCrypto msgCrypto = null;
static enum SubscriptionMode {
private final Map<String, String> metadata;
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Durable,
......@@ -164,6 +159,12 @@ public class ConsumerImpl extends ConsumerBase {
this.msgCrypto = new MessageCrypto(logCtx, false);
}
if (conf.getProperties().isEmpty()) {
metadata = Collections.emptyMap();
} else {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
}
grabCnx();
}
......@@ -549,7 +550,7 @@ public class ConsumerImpl extends ConsumerBase {
}
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
consumerName, isDurable, startMessageIdData);
consumerName, isDurable, startMessageIdData, metadata);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
......
......@@ -33,6 +33,10 @@ public final class CommandUtils {
return toMap(commandProducer.getMetadataList());
}
public static Map<String, String> metadataFromCommand(PulsarApi.CommandSubscribe commandSubscribe) {
return toMap(commandSubscribe.getMetadataList());
}
static List<PulsarApi.KeyValue> toKeyValueList(Map<String, String> metadata) {
if (metadata == null || metadata.isEmpty()) {
return Collections.emptyList();
......
......@@ -312,11 +312,12 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
true /* isDurable */, null /* startMessageId */ );
true /* isDurable */, null /* startMessageId */, Collections.emptyMap());
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId) {
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
......@@ -329,6 +330,8 @@ public class Commands {
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
CommandSubscribe subscribe = subscribeBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe));
subscribeBuilder.recycle();
......
......@@ -5629,6 +5629,12 @@ public final class PulsarApi {
// optional .pulsar.proto.MessageIdData start_message_id = 9;
boolean hasStartMessageId();
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getStartMessageId();
// repeated .pulsar.proto.KeyValue metadata = 10;
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>
getMetadataList();
org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index);
int getMetadataCount();
}
public static final class CommandSubscribe extends
com.google.protobuf.GeneratedMessageLite
......@@ -5867,6 +5873,27 @@ public final class PulsarApi {
return startMessageId_;
}
// repeated .pulsar.proto.KeyValue metadata = 10;
public static final int METADATA_FIELD_NUMBER = 10;
private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> metadata_;
public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> getMetadataList() {
return metadata_;
}
public java.util.List<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder>
getMetadataOrBuilderList() {
return metadata_;
}
public int getMetadataCount() {
return metadata_.size();
}
public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index) {
return metadata_.get(index);
}
public org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder getMetadataOrBuilder(
int index) {
return metadata_.get(index);
}
private void initFields() {
topic_ = "";
subscription_ = "";
......@@ -5877,6 +5904,7 @@ public final class PulsarApi {
priorityLevel_ = 0;
durable_ = true;
startMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
metadata_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
......@@ -5909,6 +5937,12 @@ public final class PulsarApi {
return false;
}
}
for (int i = 0; i < getMetadataCount(); i++) {
if (!getMetadata(i).isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
......@@ -5948,6 +5982,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeMessage(9, startMessageId_);
}
for (int i = 0; i < metadata_.size(); i++) {
output.writeMessage(10, metadata_.get(i));
}
}
private int memoizedSerializedSize = -1;
......@@ -5992,6 +6029,10 @@ public final class PulsarApi {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(9, startMessageId_);
}
for (int i = 0; i < metadata_.size(); i++) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, metadata_.get(i));
}
memoizedSerializedSize = size;
return size;
}
......@@ -6123,6 +6164,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000080);
startMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
bitField0_ = (bitField0_ & ~0x00000100);
metadata_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
......@@ -6192,6 +6235,11 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000100;
}
result.startMessageId_ = startMessageId_;
if (((bitField0_ & 0x00000200) == 0x00000200)) {
metadata_ = java.util.Collections.unmodifiableList(metadata_);
bitField0_ = (bitField0_ & ~0x00000200);
}
result.metadata_ = metadata_;
result.bitField0_ = to_bitField0_;
return result;
}
......@@ -6225,6 +6273,16 @@ public final class PulsarApi {
if (other.hasStartMessageId()) {
mergeStartMessageId(other.getStartMessageId());
}
if (!other.metadata_.isEmpty()) {
if (metadata_.isEmpty()) {
metadata_ = other.metadata_;
bitField0_ = (bitField0_ & ~0x00000200);
} else {
ensureMetadataIsMutable();
metadata_.addAll(other.metadata_);
}
}
return this;
}
......@@ -6255,6 +6313,12 @@ public final class PulsarApi {
return false;
}
}
for (int i = 0; i < getMetadataCount(); i++) {
if (!getMetadata(i).isInitialized()) {
return false;
}
}
return true;
}
......@@ -6334,6 +6398,12 @@ public final class PulsarApi {
subBuilder.recycle();
break;
}
case 82: {
org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder();
input.readMessage(subBuilder, extensionRegistry);
addMetadata(subBuilder.buildPartial());
break;
}
}
}
}
......@@ -6599,6 +6669,95 @@ public final class PulsarApi {
return this;
}
// repeated .pulsar.proto.KeyValue metadata = 10;
private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> metadata_ =
java.util.Collections.emptyList();
private void ensureMetadataIsMutable() {
if (!((bitField0_ & 0x00000200) == 0x00000200)) {
metadata_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>(metadata_);
bitField0_ |= 0x00000200;
}
}
public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> getMetadataList() {
return java.util.Collections.unmodifiableList(metadata_);
}
public int getMetadataCount() {
return metadata_.size();
}
public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index) {
return metadata_.get(index);
}
public Builder setMetadata(
int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
if (value == null) {
throw new NullPointerException();
}
ensureMetadataIsMutable();
metadata_.set(index, value);
return this;
}
public Builder setMetadata(
int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
ensureMetadataIsMutable();
metadata_.set(index, builderForValue.build());
return this;
}
public Builder addMetadata(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
if (value == null) {
throw new NullPointerException();
}
ensureMetadataIsMutable();
metadata_.add(value);
return this;
}
public Builder addMetadata(
int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
if (value == null) {
throw new NullPointerException();
}
ensureMetadataIsMutable();
metadata_.add(index, value);
return this;
}
public Builder addMetadata(
org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
ensureMetadataIsMutable();
metadata_.add(builderForValue.build());
return this;
}
public Builder addMetadata(
int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
ensureMetadataIsMutable();
metadata_.add(index, builderForValue.build());
return this;
}
public Builder addAllMetadata(
java.lang.Iterable<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> values) {
ensureMetadataIsMutable();
super.addAll(values, metadata_);
return this;
}
public Builder clearMetadata() {
metadata_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
public Builder removeMetadata(int index) {
ensureMetadataIsMutable();
metadata_.remove(index);
return this;
}
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
}
......
......@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
/**
......@@ -53,6 +55,9 @@ public class ConsumerStats {
/** Client library version */
public String clientVersion;
/** Metadata (key/value strings) associated with this consumer */
public Map<String, String> metadata;
public ConsumerStats add(ConsumerStats stats) {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
......
......@@ -178,6 +178,9 @@ message CommandSubscribe {
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;
/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;
}
message CommandPartitionedTopicMetadata {
......
......@@ -46,15 +46,15 @@ public class CommandUtilsTests {
}
@Test
public void testMetadataFromCommand() {
Map<String, String> metadata = CommandUtils.metadataFromCommand(newCommand(null, null));
public void testMetadataFromCommandProducer() {
Map<String, String> metadata = CommandUtils.metadataFromCommand(newCommandProducer(null, null));
Assert.assertNotNull(metadata);
Assert.assertTrue(metadata.isEmpty());
final String key = "key";
final String value = "value";
PulsarApi.CommandProducer cmd = newCommand(key, value);
PulsarApi.CommandProducer cmd = newCommandProducer(key, value);
metadata = CommandUtils.metadataFromCommand(cmd);
Assert.assertEquals(1, metadata.size());
final Map.Entry<String, String> entry = metadata.entrySet().iterator().next();
......@@ -62,7 +62,24 @@ public class CommandUtilsTests {
Assert.assertEquals(value, entry.getValue());
}
private PulsarApi.CommandProducer newCommand(String key, String value) {
@Test
public void testMetadataFromCommandSubscribe() {
Map<String, String> metadata = CommandUtils.metadataFromCommand(newCommandSubscribe(null, null));
Assert.assertNotNull(metadata);
Assert.assertTrue(metadata.isEmpty());
final String key = "key";
final String value = "value";
PulsarApi.CommandSubscribe cmd = newCommandSubscribe(key, value);
metadata = CommandUtils.metadataFromCommand(cmd);
Assert.assertEquals(1, metadata.size());
final Map.Entry<String, String> entry = metadata.entrySet().iterator().next();
Assert.assertEquals(key, entry.getKey());
Assert.assertEquals(value, entry.getValue());
}
private PulsarApi.CommandProducer newCommandProducer(String key, String value) {
PulsarApi.CommandProducer.Builder cmd = PulsarApi.CommandProducer.newBuilder()
.setProducerId(1)
.setRequestId(1)
......@@ -75,4 +92,19 @@ public class CommandUtilsTests {
return cmd.build();
}
private PulsarApi.CommandSubscribe newCommandSubscribe(String key, String value) {
PulsarApi.CommandSubscribe.Builder cmd = PulsarApi.CommandSubscribe.newBuilder()
.setConsumerId(1)
.setRequestId(1)
.setTopic("my-topic")
.setSubscription("my-subscription")
.setSubType(PulsarApi.CommandSubscribe.SubType.Shared);
if (key != null && value != null) {
cmd.addMetadata(PulsarApi.KeyValue.newBuilder().setKey(key).setValue(value).build());
}
return cmd.build();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册