未验证 提交 9c694e5a 编写于 作者: F feynmanlin 提交者: GitHub

Support set MaxUnackMessagesPerSubscription on topic level (#7802)

Motivation
support set MaxUnackMessagesPerSubscription on topic level

Modifications
Support set/get/remove MaxUnackMessagesPerSubscription policy on topic level.

Verifying this change
Added Unit test to verify set/get/remove MaxUnackMessagesPerSubscription policy at Topic level work as expected when Topic level policy is enabled/disabled

org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnSubscriptionApi
org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnSubscription
上级 48f5a2f6
......@@ -800,6 +800,21 @@ public class PersistentTopicsBase extends AdminResource {
}
}
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}
private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
Topic topic;
try {
......
......@@ -250,6 +250,71 @@ public class PersistentTopics extends PersistentTopicsBase {
internalCreateNonPartitionedTopic(authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Set max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Delete max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setMaxUnackedMessagesOnSubscription(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Get delayed delivery messages config on a topic.")
......
......@@ -350,7 +350,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.maxUnackedMessagesOnSubscription);
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
......@@ -694,7 +694,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
@Override
public void addUnAckedMessages(int numberOfMessages) {
int maxUnackedMessages = topic.maxUnackedMessagesOnSubscription;
int maxUnackedMessages = topic.getMaxUnackedMessagesOnSubscription();
if (maxUnackedMessages == -1) {
maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
......
......@@ -186,7 +186,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;
public volatile int maxUnackedMessagesOnSubscription = -1;
private volatile int maxUnackedMessagesOnSubscription = -1;
private volatile boolean isClosingOrDeleting = false;
private static class TopicStatsHelper {
......@@ -2325,4 +2325,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
return delayedDeliveryEnabled;
}
public int getMaxUnackedMessagesOnSubscription() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
return topicPolicies.getMaxUnackedMessagesOnSubscription();
}
return maxUnackedMessagesOnSubscription;
}
}
......@@ -44,8 +44,6 @@ import static org.testng.Assert.*;
@Slf4j
public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(AdminApiMaxUnackedMessages.class);
@BeforeMethod
@Override
public void setup() throws Exception {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.testng.Assert.fail;
public class MaxUnackedMessagesTest extends ProducerConsumerBase {
private final String testTenant = "public";
private final String testNamespace = "default";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";
@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test(timeOut = 10000)
public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max = admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
assertNull(max);
admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 2048);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
break;
}
Thread.sleep(100);
}
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName).intValue(), 2048);
admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null) {
break;
}
Thread.sleep(100);
}
assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
}
// See https://github.com/apache/pulsar/issues/5438
@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnSubscription() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 200;
pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
List<Consumer<?>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnSubscription(topicName, unackMsgAllowed);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
break;
}
Thread.sleep(100);
}
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
Message<?> msg = null;
Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(msg, consumers.get(i));
} else {
break;
}
}
}
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages: check
// delta as 3 consumers with receiverQueueSize = 10
Assert.assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 3);
// start acknowledging messages
messages.forEach((m, c) -> {
try {
c.acknowledge(m);
} catch (PulsarClientException e) {
fail("ack failed", e);
}
});
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
Set<MessageId> result = ConcurrentHashMap.newKeySet();
// expecting messages which are not received
int expectedRemainingMessages = totalProducedMsgs - messages.size();
CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
for (int i = 0; i < consumers.size(); i++) {
final int consumerCount = i;
for (int j = 0; j < totalProducedMsgs; j++) {
consumers.get(i).receiveAsync().thenAccept(m -> {
result.add(m.getMessageId());
try {
consumers.get(consumerCount).acknowledge(m);
} catch (PulsarClientException e) {
fail("failed to ack msg", e);
}
latch.countDown();
});
}
}
latch.await(10, TimeUnit.SECONDS);
// total received-messages should match to produced messages (it may have duplicate messages)
Assert.assertEquals(result.size(), expectedRemainingMessages);
producer.close();
consumers.forEach(c -> {
try {
c.close();
} catch (PulsarClientException e) {
}
});
}
private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
try {
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
break;
} catch (Exception e) {
//ignore
Thread.sleep(100);
}
if (i == 49) {
throw new RuntimeException("Waiting for cache initialization has timed out");
}
}
}
}
......@@ -1663,4 +1663,49 @@ public interface Topics {
* Topic name
*/
CompletableFuture<Void> removeRetentionAsync(String topic);
/**
* get max unacked messages on subscription of a topic.
* @param topic
* @return
* @throws PulsarAdminException
*/
Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException;
/**
* get max unacked messages on subscription of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic);
/**
* set max unacked messages on subscription of a topic.
* @param topic
* @param maxNum
* @throws PulsarAdminException
*/
void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException;
/**
* set max unacked messages on subscription of a topic asynchronously.
* @param topic
* @param maxNum
* @return
*/
CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum);
/**
* remove max unacked messages on subscription of a topic.
* @param topic
* @throws PulsarAdminException
*/
void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException;
/**
* remove max unacked messages on subscription of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic);
}
......@@ -1510,6 +1510,84 @@ public class TopicsImpl extends BaseResource implements Topics {
}
}
@Override
public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
try {
return getMaxUnackedMessagesOnSubscriptionAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxNum) {
future.complete(maxNum);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException {
try {
setMaxUnackedMessagesOnSubscriptionAsync(topic, maxNum).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
return asyncPostRequest(path, Entity.entity(maxNum, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
try {
removeMaxUnackedMessagesOnSubscriptionAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
return asyncDeleteRequest(path);
}
@Override
public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
try {
......
......@@ -54,6 +54,11 @@ public class TopicPolicies {
public boolean isDelayedDeliveryEnabledSet(){
return delayedDeliveryEnabled != null;
}
private Integer maxUnackedMessagesOnSubscription = null;
public boolean isMaxUnackedMessagesOnSubscriptionSet() {
return maxUnackedMessagesOnSubscription != null;
}
public boolean isBacklogQuotaSet() {
return !backLogQuotaMap.isEmpty();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册