diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0123aa76fa2778adbdd71bfbff4efec5bb1a886e..a78ead774427c660ab78aa51c3e97d4e84bb8e13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3100,4 +3100,46 @@ public class PersistentTopicsBase extends AdminResource { } + protected Optional internalGetCompactionThreshold() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold); + } + + protected CompletableFuture internalSetCompactionThreshold(Long compactionThreshold) { + if (compactionThreshold != null && compactionThreshold < 0) { + throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold"); + } + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + + TopicPolicies topicPolicies = getTopicPolicies(topicName) + .orElseGet(TopicPolicies::new); + topicPolicies.setCompactionThreshold(compactionThreshold); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + + protected CompletableFuture internalRemoveCompactionThreshold() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + Optional topicPolicies = getTopicPolicies(topicName); + if (!topicPolicies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + topicPolicies.get().setCompactionThreshold(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c00f2700d2ceb42f536ee25882bb7226293796d4..c98867a9553d20937a7f1ea217441b20988f1025 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1784,5 +1784,92 @@ public class PersistentTopics extends PersistentTopicsBase { }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/compactionThreshold") + @ApiOperation(value = "Get compaction threshold configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional compactionThreshold = internalGetCompactionThreshold(); + if (!compactionThreshold.isPresent()) { + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(compactionThreshold.get()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/compactionThreshold") + @ApiOperation(value = "Set compaction threshold configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Failed to set topic dispatch rate", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed to set topic dispatch rate"); + asyncResponse.resume(new RestException(ex)); + } else { + try { + log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + jsonMapper().writeValueAsString(compactionThreshold)); + } catch (JsonProcessingException ignore) {} + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/compactionThreshold") + @ApiOperation(value = "Remove compaction threshold configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalRemoveCompactionThreshold().whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove topic dispatch rate", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3899313285f6e4d3a5e953501d348800fcc1f3a2..7ea58e58aded8bbb0ce3eba68a8657e238fe3965 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1155,12 +1155,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public void checkCompaction() { TopicName name = TopicName.get(topic); try { - Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); + Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name)) + .map(TopicPolicies::getCompactionThreshold) + .orElse(null); + if (compactionThreshold == null) { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, name.getNamespace())) + .orElseThrow(() -> new KeeperException.NoNodeException()); + compactionThreshold = policies.compaction_threshold; + } - if (isSystemTopic() || policies.compaction_threshold != 0 + if (isSystemTopic() || compactionThreshold != 0 && currentCompaction.isDone()) { long backlogEstimate = 0; @@ -1173,13 +1179,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal backlogEstimate = ledger.getEstimatedBacklogSize(); } - if (backlogEstimate > policies.compaction_threshold) { + if (backlogEstimate > compactionThreshold) { try { triggerCompaction(); } catch (AlreadyRunningException are) { log.debug("[{}] Compaction already running, so don't trigger again, " + "even though backlog({}) is over threshold({})", - name, backlogEstimate, policies.compaction_threshold); + name, backlogEstimate, compactionThreshold); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java index 329c3c7d2c6d09549284610e6b1fb5409ad29d6c..fa073fa004381e1c51c7dac28a6abfcaf71c460a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java @@ -150,4 +150,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(e.getStatusCode(), 405); } } + + @Test + public void testCompactionThresholdDisabled() { + Long compactionThreshold = 10000L; + log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); + + try { + admin.topics().setCompactionThreshold(testTopic, compactionThreshold); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + + try { + admin.topics().getCompactionThreshold(testTopic); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index ef16adb758ee366f73e21d030bb0c53f6ebc0e23..e2166618993a3cdbfc4caabb29c084cd9d9a819f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -377,4 +377,42 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(testTopic, true); } + + @Test + public void testGetSetCompactionThreshold() throws Exception { + long compactionThreshold = 100000; + log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); + + admin.topics().setCompactionThreshold(testTopic, compactionThreshold); + log.info("Compaction threshold set success on topic: {}", testTopic); + + Thread.sleep(3000); + long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic); + log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic); + Assert.assertEquals(getCompactionThreshold, compactionThreshold); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testRemoveCompactionThreshold() throws Exception { + Long compactionThreshold = 100000L; + log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); + + admin.topics().setCompactionThreshold(testTopic, compactionThreshold); + log.info("Compaction threshold set success on topic: {}", testTopic); + + Thread.sleep(3000); + Long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic); + log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic); + Assert.assertEquals(getCompactionThreshold, compactionThreshold); + + admin.topics().removeCompactionThreshold(testTopic); + Thread.sleep(3000); + log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic); + getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic); + Assert.assertNull(getCompactionThreshold); + + admin.topics().deletePartitionedTopic(testTopic, true); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 93d31928e2f76101c5cc91aaae9a333cc86a9a39..6ca239084eaed773ae6a7fc3fdcd7a00eb5d1fe7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1913,4 +1913,98 @@ public interface Topics { */ CompletableFuture removeDispatchRateAsync(String topic) throws PulsarAdminException; + /** + * Get the compactionThreshold for a topic. The maximum number of bytes + * can have before compaction is triggered. 0 disables. + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param topic + * Topic name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + Long getCompactionThreshold(String topic) throws PulsarAdminException; + + /** + * Get the compactionThreshold for a topic asynchronously. The maximum number of bytes + * can have before compaction is triggered. 0 disables. + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param topic + * Topic name + */ + CompletableFuture getCompactionThresholdAsync(String topic); + + /** + * Set the compactionThreshold for a topic. The maximum number of bytes + * can have before compaction is triggered. 0 disables. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param topic + * Topic name + * @param compactionThreshold + * maximum number of backlog bytes before compaction is triggered + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException; + + /** + * Set the compactionThreshold for a topic asynchronously. The maximum number of bytes + * can have before compaction is triggered. 0 disables. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param topic + * Topic name + * @param compactionThreshold + * maximum number of backlog bytes before compaction is triggered + */ + CompletableFuture setCompactionThresholdAsync(String topic, long compactionThreshold); + + /** + * Remove the compactionThreshold for a topic. + * @param topic + * Topic name + * @throws PulsarAdminException + * Unexpected error + */ + void removeCompactionThreshold(String topic) throws PulsarAdminException; + + /** + * Remove the compactionThreshold for a topic asynchronously. + * @param topic + * Topic name + */ + CompletableFuture removeCompactionThresholdAsync(String topic); + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index ffd9339f1ea8c6022056a32bf1df8ee25ae4908d..ce4ce787783b25562b633b5512bf1eae853241a9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2003,6 +2003,81 @@ public class TopicsImpl extends BaseResource implements Topics { return asyncDeleteRequest(path); } + @Override + public Long getCompactionThreshold(String topic) throws PulsarAdminException { + try { + return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getCompactionThresholdAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "compactionThreshold"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Long compactionThreshold) { + future.complete(compactionThreshold); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException { + try { + setCompactionThresholdAsync(topic, compactionThreshold).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setCompactionThresholdAsync(String topic, long compactionThreshold) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "compactionThreshold"); + return asyncPostRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeCompactionThreshold(String topic) throws PulsarAdminException { + try { + removeCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeCompactionThresholdAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "compactionThreshold"); + return asyncDeleteRequest(path); + } - private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index b84794ee416649b5e4c81c832f93cd759c6fc550..5b195762a209f7fe39728863d246acda7702fd6b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -123,6 +123,9 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("get-dispatch-rate", new GetDispatchRate()); jcommander.addCommand("set-dispatch-rate", new SetDispatchRate()); jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate()); + jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); + jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); + jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); } @Parameters(commandDescription = "Get the list of topics under a namespace.") @@ -259,17 +262,17 @@ public class CmdTopics extends CmdBase { @Parameters(commandDescription = "Create a non-partitioned topic.") private class CreateNonPartitionedCmd extends CliCommand { - + @Parameter(description = "persistent://tenant/namespace/topic\n", required = true) private java.util.List params; - + @Override void run() throws Exception { String topic = validateTopicName(params); topics.createNonPartitionedTopic(topic); } } - + @Parameters(commandDescription = "Update existing non-global partitioned topic. \n" + "\t\tNew updating number of partitions must be greater than existing number of partitions.") private class UpdatePartitionedCmd extends CliCommand { @@ -1162,4 +1165,46 @@ public class CmdTopics extends CmdBase { admin.topics().removeDispatchRate(persistentTopic); } } + + @Parameters(commandDescription = "Get compaction threshold for a topic") + private class GetCompactionThreshold extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getCompactionThreshold(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set compaction threshold for a topic") + private class SetCompactionThreshold extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--threshold", "-t" }, + description = "Maximum number of bytes in a topic backlog before compaction is triggered " + + "(eg: 10M, 16G, 3T). 0 disables automatic compaction", + required = true) + private String threshold = "0"; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setCompactionThreshold(persistentTopic, validateSizeString(threshold)); + } + } + + @Parameters(commandDescription = "Remove compaction threshold for a topic") + private class RemoveCompactionThreshold extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeCompactionThreshold(persistentTopic); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 0e459c67742cb2fca8ca4072f6869416b76c0408..d56e283b5d0da50f030d6234a24126d61494ab0a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -50,6 +50,7 @@ public class TopicPolicies { private Long delayedDeliveryTickTimeMillis = null; private Boolean delayedDeliveryEnabled = null; private DispatchRate dispatchRate = null; + private Long compactionThreshold = null; public boolean isMaxUnackedMessagesOnConsumerSet() { return maxUnackedMessagesOnConsumer != null; @@ -102,4 +103,8 @@ public class TopicPolicies { public boolean isDispatchRateSet() { return dispatchRate != null; } + + public boolean isCompactionThresholdSet() { + return compactionThreshold != null; + } }