From 8933d8ddffe649e3e45458005fae5a5c6a3de47a Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 9 Sep 2020 09:06:58 +0800 Subject: [PATCH] Fix dispatchRate is overwritten (#8004) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #7863 ### Motivation 1) Topic level policy will be overwritten by namespace level 2) After removing the topic level policy, `DispatchLimiter` does not take effect, the namespace level policy should be used ### Modifications 1)If there is a topic-level policy, it will not be overwritten by the namespace-level policy when the policy is updated 2)When removing topic-level policy, namespace-level policy will be used. If the namespace-level policy does not exist, the broker level will be used ### Verifying this change unit test: TopicPoliciesTest#testPolicyOverwrittenByNamespaceLevel --- .../service/persistent/PersistentTopic.java | 36 +++++++++------- .../broker/admin/TopicPoliciesTest.java | 42 +++++++++++++++++++ .../service/InactiveTopicDeleteTest.java | 6 +-- 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index becd0943b30..292e0709c25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1284,10 +1284,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return subscriptions; } + @Override public PersistentSubscription getSubscription(String subscriptionName) { return subscriptions.get(subscriptionName); } + @Override public ConcurrentOpenHashMap getReplicators() { return replicators; } @@ -1858,7 +1860,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled()); } - initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data)); this.updateMaxPublishRate(data); @@ -1883,7 +1884,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal CompletableFuture persistentPoliciesFuture = checkPersistencePolicies(); // update rate-limiter if policies updated if (this.dispatchRateLimiter.isPresent()) { - dispatchRateLimiter.get().onPoliciesUpdate(data); + if (topicPolicies == null || !topicPolicies.isDispatchRateSet()) { + dispatchRateLimiter.get().onPoliciesUpdate(data); + } } if (this.subscribeRateLimiter.isPresent()) { subscribeRateLimiter.get().onPoliciesUpdate(data); @@ -2375,10 +2378,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } Optional namespacePolicies = getNamespacePolicies(); initializeTopicDispatchRateLimiterIfNeeded(policies); - if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) { - dispatchRateLimiter.ifPresent(dispatchRateLimiter -> - dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate())); - } + + dispatchRateLimiter.ifPresent(limiter -> { + if (policies.isDispatchRateSet()) { + dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate()); + } else { + dispatchRateLimiter.get().updateDispatchRate(); + } + }); if (policies.getPublishRate() != null) { topicPolicyPublishRate = policies.getPublishRate(); @@ -2387,17 +2394,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (policies.isInactiveTopicPoliciesSet()) { inactiveTopicPolicies = policies.getInactiveTopicPolicies(); + } else if (namespacePolicies.isPresent() && namespacePolicies.get().inactive_topic_policies != null) { + //topic-level policies is null , so use namespace-level + inactiveTopicPolicies = namespacePolicies.get().inactive_topic_policies; } else { - //topic-level policies is null , so use namespace-level or broker-level - namespacePolicies.ifPresent(nsPolicies -> { - if (nsPolicies.inactive_topic_policies != null) { - inactiveTopicPolicies = nsPolicies.inactive_topic_policies; - } else { - ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration(); - resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode() - , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled()); - } - }); + //namespace-level policies is null , so use broker level + ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration(); + resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode() + , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 4a17d7085e3..684a1028ee8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BacklogQuotaManager; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -44,6 +45,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.UUID; + @Slf4j public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @@ -516,6 +519,45 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(testTopic, true); } + @Test(timeOut = 20000) + public void testPolicyOverwrittenByNamespaceLevel() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + //wait for cache init + Thread.sleep(2000); + DispatchRate dispatchRate = new DispatchRate(200, 20000, 1, true); + admin.namespaces().setDispatchRate(myNamespace, dispatchRate); + //wait for zk + Thread.sleep(2000); + dispatchRate = new DispatchRate(100, 10000, 1, true); + admin.topics().setDispatchRate(topic, dispatchRate); + for (int i = 0; i < 10; i++) { + if (admin.topics().getDispatchRate(topic) != null) { + break; + } + Thread.sleep(500); + } + //1 Set ns level policy, topic level should not be overwritten + dispatchRate = new DispatchRate(300, 30000, 2, true); + admin.namespaces().setDispatchRate(myNamespace, dispatchRate); + //wait for zk + Thread.sleep(1000); + DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get(); + Assert.assertEquals(limiter.getDispatchRateOnByte(), 10000); + Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100); + admin.topics().removeDispatchRate(topic); + for (int i = 0; i < 10; i++) { + if (admin.topics().getDispatchRate(topic) == null) { + break; + } + Thread.sleep(500); + } + //2 Remove level policy ,DispatchRateLimiter should us ns level policy + limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get(); + Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000); + Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300); + } + @Test public void testGetSetCompactionThreshold() throws Exception { long compactionThreshold = 100000; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 79a296e8782..9969efad3f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -303,11 +303,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { conf.setSystemTopicEnabled(true); conf.setTopicLevelPoliciesEnabled(true); super.baseSetup(); - final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(topicName, 3); - //wait for cache init - Thread.sleep(2000); + InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName); assertNull(inactiveTopicPolicies); @@ -316,6 +314,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); policies.setMaxInactiveDurationSeconds(10); admin.topics().setInactiveTopicPolicies(topicName, policies); + //wait for init + Thread.sleep(3000); for (int i = 0; i < 50; i++) { if (admin.topics().getInactiveTopicPolicies(topicName) != null) { break; -- GitLab