diff --git a/conf/broker.conf b/conf/broker.conf index eae9cc7ea1cb041cbb9c1b96815f015396e429a2..384bd1136f2b8890622a382719e18b132608dc57 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -226,6 +226,11 @@ dispatchThrottlingRatePerReplicatorInMsg=0 # Using a value of 0, is disabling replication message-byte dispatch-throttling dispatchThrottlingRatePerReplicatorInByte=0 +# Dispatch rate-limiting relative to publish rate. +# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate: +# throttle-dispatch-rate = (publish-rate + configured dispatch-rate). +dispatchThrottlingRateRelativeToPublishRate=false + # By default we enable dispatch-throttling for both caught up consumers as well as consumers who have # backlog. dispatchThrottlingOnNonBacklogConsumerEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 6efd1cd585dbac8ffa7bfee4acf4b0f057610466..6d1023bb4027497be3105aa9a32c3cbe20f9f6ff 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -169,6 +169,11 @@ dispatchThrottlingRatePerTopicInMsg=0 # default message-byte dispatch-throttling dispatchThrottlingRatePerTopicInByte=0 +# Dispatch rate-limiting relative to publish rate. +# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate: +# throttle-dispatch-rate = (publish-rate + configured dispatch-rate). +dispatchThrottlingRateRelativeToPublishRate=false + # By default we enable dispatch-throttling for both caught up consumers as well as consumers who have # backlog. dispatchThrottlingOnNonBacklogConsumerEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 70c70003a581ecfe25adc42090d944531adf53b0..46695bdf1345eff6ca6b95ddedc00017a1954d54 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -453,7 +453,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n" + "Using a value of 0, is disabling replication message-byte dispatch-throttling") private long dispatchThrottlingRatePerReplicatorInByte = 0; - + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Dispatch rate-limiting relative to publish rate. (Enabling flag will make broker to dynamically " + + "update dispatch-rate relatively to publish-rate: " + + "throttle-dispatch-rate = (publish-rate + configured dispatch-rate) ") + private boolean dispatchThrottlingRateRelativeToPublishRate = false; @FieldContext( dynamic = true, category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 550ad3867d7dade859d9fe55d363001541e779c7..3f437ddc81566178651c306f2912e0c6696738fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -138,7 +138,8 @@ public class DispatchRateLimiter { dispatchThrottlingRateInByte = -1; } - return new DispatchRate(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, 1); + return new DispatchRate(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, 1, + config.isDispatchThrottlingRateRelativeToPublishRate()); } /**