提交 3345d24d 编写于 作者: R Rajan 提交者: Matteo Merli

Add server-side lookup throttling (#181)

上级 92b79d0e
......@@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000
### --- Authentication --- ###
# Enable authentication
......
......@@ -88,6 +88,9 @@ public class ServiceConfiguration implements PulsarConfiguration{
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
@FieldContext(dynamic = true)
private int maxConcurrentLookupRequest = 10000;
/***** --- TLS --- ****/
// Enable TLS
......@@ -415,6 +418,14 @@ public class ServiceConfiguration implements PulsarConfiguration{
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}
public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}
public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) {
this.maxConcurrentLookupRequest = maxConcurrentLookupRequest;
}
public boolean isTlsEnabled() {
return tlsEnabled;
}
......
......@@ -67,6 +67,12 @@ public class DestinationLookup extends PulsarWebResource {
@Suspended AsyncResponse asyncResponse) {
dest = Codec.decode(dest);
DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest);
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
try {
validateClusterOwnership(topic.getCluster());
......@@ -75,12 +81,12 @@ public class DestinationLookup extends PulsarWebResource {
} catch (WebApplicationException we) {
// Validation checks failed
log.error("Validation check failed: {}", we.getMessage());
asyncResponse.resume(we);
completeLookupResponseExceptionally(asyncResponse, we);
return;
} catch (Throwable t) {
// Validation checks failed with unknown error
log.error("Validation check failed: {}", t.getMessage(), t);
asyncResponse.resume(new RestException(t));
completeLookupResponseExceptionally(asyncResponse, new RestException(t));
return;
}
......@@ -90,7 +96,7 @@ public class DestinationLookup extends PulsarWebResource {
lookupFuture.thenAccept(result -> {
if (result == null) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
......@@ -105,24 +111,24 @@ public class DestinationLookup extends PulsarWebResource {
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}
asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build()));
} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData());
}
asyncResponse.resume(result.getLookupData());
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception);
asyncResponse.resume(exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});
......@@ -236,6 +242,16 @@ public class DestinationLookup extends PulsarWebResource {
return lookupfuture;
}
private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(t);
}
private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(lookupData);
}
private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
......@@ -34,7 +34,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
......@@ -128,6 +130,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
private final ScheduledExecutorService backlogQuotaChecker;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
......@@ -206,7 +210,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), true));
PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
......@@ -619,6 +626,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
public Map<String, NamespaceBundleStats> getBundleStats() {
return pulsarStats.getBundleStats();
}
public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}
public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
......@@ -841,7 +852,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
public AuthenticationService getAuthenticationService() {
return authenticationService;
}
public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
}
......@@ -857,7 +868,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void updateConfigurationAndRegisterListeners() {
// update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();
//add more listeners here
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true)));
// add more listeners here
}
/**
......
......@@ -124,4 +124,4 @@ public class BrokerServiceException extends Exception {
return PulsarApi.ServerError.UnknownError;
}
}
}
}
\ No newline at end of file
......@@ -159,15 +159,28 @@ public class ServerCnx extends PulsarHandler {
}
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
ctx.writeAndFlush(lookupResponse);
}).exceptionally(ex -> {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(
newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed lookup due to too many lookup-requets {}", remoteAddress, topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}
@Override
......@@ -177,24 +190,36 @@ public class ServerCnx extends PulsarHandler {
}
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.thenAccept(metadata -> {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic,
ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
} else {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress,
topic, ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", remoteAddress,
topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}
@Override
......@@ -543,7 +568,6 @@ public class ServerCnx extends PulsarHandler {
});
}
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
checkArgument(state == State.Connected);
......
......@@ -27,6 +27,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
......@@ -108,6 +110,7 @@ public class HttpDestinationLookupv2Test {
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(auth).when(brokerService).getAuthorizationManager();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
}
@Test
......@@ -134,6 +137,35 @@ public class HttpDestinationLookupv2Test {
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
}
@Test
public void testNotEnoughLookupPermits() throws Exception {
BrokerService brokerService = pulsar.getBrokerService();
doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore();
DestinationLookup destLookup = spy(new DestinationLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupDestinationAsync("myprop", "usc", "ns2", "topic1", false, asyncResponse1);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
assertEquals(arg.getValue().getClass(), WebApplicationException.class);
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode());
}
@Test
public void testValidateReplicationSettingsOnNamespace() throws Exception {
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.service;
import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.ConsumerImpl;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
/**
*/
public class BrokerServiceThrottlingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
/**
* Verifies: updating zk-thottling node reflects broker-maxConcurrentLookupRequest and updates semaphore.
*
* @throws Exception
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
BrokerService service = pulsar.getBrokerService();
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
Thread.sleep(1000);
assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
}
/**
* Broker has maxConcurrentLookupRequest = 0 so, it rejects incoming lookup request and it cause consumer creation
* failure.
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBroker0Permit() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);
ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
Consumer consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig);
consumer.close();
int newPermits = 0;
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
try {
consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig);
consumer.close();
fail("It should fail as throttling should not receive any request");
} catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) {
// ok as throttling set to 0
}
}
/**
* Verifies: Broker side throttling:
*
* <pre>
* 1. concurrent_consumer_creation > maxConcurrentLookupRequest at broker
* 2. few of the consumer creation must fail with TooManyLookupRequestException.
* </pre>
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBroker() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setIoThreads(20);
clientConf.setConnectionsPerBroker(20);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);
ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
consumerConfig.setSubscriptionType(SubscriptionType.Shared);
int newPermits = 1;
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
List<Consumer> successfulConsumers = Lists.newArrayList();
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 20;
CountDownLatch latch = new CountDownLatch(totalConsumers);
for (int i = 0; i < totalConsumers; i++) {
executor.execute(() -> {
try {
successfulConsumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig));
} catch (PulsarClientException.TooManyLookupRequestException e) {
// ok
} catch (Exception e) {
fail("it shouldn't failed");
}
latch.countDown();
});
}
latch.await();
for (int i = 0; i < successfulConsumers.size(); i++) {
successfulConsumers.get(i).close();
}
pulsarClient.close();
assertNotEquals(successfulConsumers.size(), totalConsumers);
}
/**
* This testcase make sure that once consumer lost connection with broker, it always reconnects with broker by
* retrying on throttling-error exception also.
*
* <pre>
* 1. all consumers get connected
* 2. broker restarts with maxConcurrentLookupRequest = 1
* 3. consumers reconnect and some get TooManyRequestException and again retries
* 4. eventually all consumers will successfully connect to broker
* </pre>
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setIoThreads(20);
clientConf.setConnectionsPerBroker(20);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);
upsertLookupPermits(100);
ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
consumerConfig.setSubscriptionType(SubscriptionType.Shared);
List<Consumer> consumers = Lists.newArrayList();
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 8;
CountDownLatch latch = new CountDownLatch(totalConsumers);
for (int i = 0; i < totalConsumers; i++) {
executor.execute(() -> {
try {
consumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig));
} catch (PulsarClientException.TooManyLookupRequestException e) {
// ok
} catch (Exception e) {
fail("it shouldn't failed");
}
latch.countDown();
});
}
latch.await();
stopBroker();
conf.setMaxConcurrentLookupRequest(1);
startBroker();
// wait for consumer to reconnect
Thread.sleep(3000);
int totalConnectedConsumers = 0;
for (int i = 0; i < consumers.size(); i++) {
if (((ConsumerImpl) consumers.get(i)).isConnected()) {
totalConnectedConsumers++;
}
consumers.get(i).close();
}
assertEquals(totalConnectedConsumers, totalConsumers);
pulsarClient.close();
}
private void upsertLookupPermits(int permits) throws Exception {
Map<String, String> throttlingMap = Maps.newHashMap();
throttlingMap.put("maxConcurrentLookupRequest", Integer.toString(permits));
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(throttlingMap);
if (mockZookKeeper.exists(BROKER_SERVICE_CONFIGURATION_PATH, false) != null) {
mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
} else {
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
private void setWatchOnThrottlingZnode() throws Exception {
Method updateConfigListenerMethod = BrokerService.class
.getDeclaredMethod("updateConfigurationAndRegisterListeners");
updateConfigListenerMethod.setAccessible(true);
updateConfigListenerMethod.invoke(pulsar.getBrokerService());
}
}
\ No newline at end of file
......@@ -413,6 +413,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.BrokerPersistenceException(errorMsg);
case ServiceNotReady:
return new PulsarClientException.LookupException(errorMsg);
case TooManyRequests:
return new PulsarClientException.TooManyLookupRequestException(errorMsg);
case ProducerBlockedQuotaExceededError:
return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg);
case ProducerBlockedQuotaExceededException:
......
......@@ -68,6 +68,7 @@ public final class PulsarApi {
TopicNotFound(11, 11),
SubscriptionNotFound(12, 12),
ConsumerNotFound(13, 13),
TooManyRequests(14, 14),
;
public static final int UnknownError_VALUE = 0;
......@@ -84,6 +85,7 @@ public final class PulsarApi {
public static final int TopicNotFound_VALUE = 11;
public static final int SubscriptionNotFound_VALUE = 12;
public static final int ConsumerNotFound_VALUE = 13;
public static final int TooManyRequests_VALUE = 14;
public final int getNumber() { return value; }
......@@ -104,6 +106,7 @@ public final class PulsarApi {
case 11: return TopicNotFound;
case 12: return SubscriptionNotFound;
case 13: return ConsumerNotFound;
case 14: return TooManyRequests;
default: return null;
}
}
......
......@@ -81,6 +81,7 @@ enum ServerError {
TopicNotFound = 11; // Topic not found
SubscriptionNotFound = 12; // Subscription not found
ConsumerNotFound = 13; // Consumer not found
TooManyRequests = 14; // Error with too many simultaneously request
}
enum AuthMethod {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册