未验证 提交 fdc3a9bc 编写于 作者: K Kai 提交者: GitHub

[Issue #5395][broker] Implement AutoTopicCreation by namespace override (#6471)

Fixes #5395 

### Motivation

This change introduces a new namespace policy `autoTopicCreationOverride`, which will enable an override of broker `autoTopicCreation` settings on the namespace level. You may keep `autoTopicCreation` disabled for the broker and allow it on a specific namespace using this feature.

### Modifications

- Add new namespace policy: `autoTopicCreationOverride` and associated API / CLI interface for setting and removing. Defaults to non-partitioned type, but also allows partitioned topics.
- Modifies BrokerService: when checking `autoTopicCreation` configuration, the broker first retrieves namespace policies from zookeeper. If the `autoTopicCreationOverride` policy exists for that namespace then it uses those settings. If not, falls back to broker configuration.
- Slight refactor to move `TopicType` enum to pulsar-common and add `autoTopicCreationOverride` to pulsar-common.
上级 ed80a936
......@@ -35,6 +35,7 @@ import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
......@@ -1538,18 +1539,4 @@ public class ServiceConfiguration implements PulsarConfiguration {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}
enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
private String type;
TopicType(String type) {
this.type = type;
}
public String toString() {
return type;
}
}
}
......@@ -69,6 +69,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
......@@ -553,6 +554,105 @@ public abstract class NamespacesBase extends AdminResource {
}
}
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}
protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = null;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
return null;
} catch (KeeperException.NoNodeException e) {
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
namespaceName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
} catch (KeeperException.BadVersionException e) {
log.error(
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}
protected void internalModifyDeduplication(boolean enableDeduplication) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
......@@ -573,17 +673,17 @@ public abstract class NamespacesBase extends AdminResource {
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
"[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
......
......@@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
......@@ -299,6 +300,43 @@ public class Namespaces extends NamespacesBase {
internalModifyDeduplication(enableDeduplication);
}
@POST
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e ) {
asyncResponse.resume(new RestException(e));
}
}
@DELETE
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Remove override of broker's allowAutoTopicCreation in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("/{tenant}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
......
......@@ -122,6 +122,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
......@@ -131,6 +132,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
......@@ -658,7 +660,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
}
public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
......@@ -1846,8 +1848,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
// If topic is already exist, creating partitioned topic is not allowed.
if (metadata.partitions == 0
&& !topicExists
&& pulsar.getConfiguration().isAllowAutoTopicCreation()
&& pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
} else {
return CompletableFuture.completedFuture(metadata);
......@@ -1858,7 +1860,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
......@@ -2091,4 +2093,53 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}
public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
}
public boolean isAllowAutoTopicCreation(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.allowAutoTopicCreation;
} else {
return pulsar.getConfiguration().isAllowAutoTopicCreation();
}
}
public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.topicType);
} else {
return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
}
}
public int getDefaultNumPartitions(final TopicName topicName) {
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.defaultNumPartitions;
} else {
return pulsar.getConfiguration().getDefaultNumPartitions();
}
}
private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", topicName, t.getMessage(), t);
return null;
}
log.warn("No autoTopicCreateOverride policy found for {}", topicName);
return null;
}
}
......@@ -805,7 +805,7 @@ public class ServerCnx extends PulsarHandler {
}
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.pulsar().getConfig().isAllowAutoTopicCreation();
&& service.isAllowAutoTopicCreation(topicName.toString());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
......
......@@ -101,7 +101,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
this.conf.setAllowAutoTopicCreationType("non-persistent");
this.conf.setAllowAutoTopicCreationType("non-partitioned");
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setWebServicePort(Optional.of(0));
......
......@@ -24,8 +24,11 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
......@@ -43,17 +46,35 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
super.internalCleanup();
}
@AfterMethod
protected void cleanupTest() throws Exception {
pulsar.getAdminClient().namespaces().removeAutoTopicCreation("prop/ns-abc");
}
@Test
public void testAutoNonPartitionedTopicCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
final String topicName = "persistent://prop/ns-abc/non-partitioned-topic";
final String topicString = "persistent://prop/ns-abc/non-partitioned-topic";
final String subscriptionName = "non-partitioned-topic-sub";
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@Test
public void testAutoNonPartitionedTopicCreationOnProduce() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
final String topicString = "persistent://prop/ns-abc/non-partitioned-topic-2";
pulsarClient.newProducer().topic(topicString).create();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@Test
......@@ -62,13 +83,28 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
final String topicName = "persistent://prop/ns-abc/partitioned-topic";
final String topicString = "persistent://prop/ns-abc/partitioned-topic";
final String subscriptionName = "partitioned-topic-sub";
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
@Test
public void testAutoPartitionedTopicCreationOnProduce() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
final String topicString = "persistent://prop/ns-abc/partitioned-topic-1";
pulsarClient.newProducer().topic(topicString).create();
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
......@@ -76,15 +112,15 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
public void testAutoTopicCreationDisable() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String topicName = "persistent://prop/ns-abc/test-topic";
final String topicString = "persistent://prop/ns-abc/test-topic";
final String subscriptionName = "test-topic-sub";
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
......@@ -93,16 +129,16 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
final String topicName = "persistent://prop/ns-abc/test-topic-2";
final String topicString = "persistent://prop/ns-abc/test-topic-2";
final String subscriptionName = "partitioned-topic-sub";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
admin.topics().createNonPartitionedTopic(topicString);
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
......@@ -158,9 +194,130 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
final String topicName = "persistent://prop/ns-abc/test-topic-3";
int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
final String topicString = "persistent://prop/ns-abc/test-topic-3";
int partitions = admin.topics().getPartitionedTopicMetadata(topicString).partitions;
assertEquals(partitions, 0);
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
public void testAutoCreationNamespaceAllowOverridesBroker() throws Exception {
final String topicString = "persistent://prop/ns-abc/test-topic-4";
final String subscriptionName = "test-topic-sub-4";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@Test
public void testAutoCreationNamespaceDisallowOverridesBroker() throws Exception {
final String topicString = "persistent://prop/ns-abc/test-topic-5";
final String subscriptionName = "test-topic-sub-5";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(false, null, null));
try {
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
public void testAutoCreationNamespaceOverrideAllowsPartitionedTopics() throws Exception {
final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-6";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
final String subscriptionName = "test-topic-sub-6";
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 4; i++) {
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
@Test
public void testAutoCreationNamespaceOverridesTopicTypePartitioned() throws Exception {
final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-7";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 3));
final String subscriptionName = "test-topic-sub-7";
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
@Test
public void testAutoCreationNamespaceOverridesTopicTypeNonPartitioned() throws Exception {
final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-8";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(2);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
final String subscriptionName = "test-topic-sub-8";
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@Test
public void testAutoCreationNamespaceOverridesDefaultNumPartitions() throws Exception {
final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-9";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(2);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
final String subscriptionName = "test-topic-sub-9";
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 4; i++) {
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
@Test
public void testAutoCreationNamespaceAllowOverridesBrokerOnProduce() throws Exception {
final String topicString = "persistent://prop/ns-abc/test-topic-10";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
pulsarClient.newProducer().topic(topicString).create();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
}
......@@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
......@@ -936,6 +937,56 @@ public interface Namespaces {
*/
CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication);
/**
* Sets the autoTopicCreation policy for a given namespace, overriding broker settings
* <p/>
* When autoTopicCreationOverride is enabled, new topics will be created upon connection,
* regardless of the broker level configuration.
* <p/>
* Request example:
*
* <pre>
* <code>
* {
* "allowAutoTopicCreation" : true,
* "topicType" : "partitioned",
* "defaultNumPartitions": 2
* }
* </code>
* </pre>
*
* @param namespace
* Namespace name
* @param autoTopicCreationOverride
* Override policies for auto topic creation
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void setAutoTopicCreation(String namespace, AutoTopicCreationOverride autoTopicCreationOverride)
throws PulsarAdminException;
/**
* Removes the autoTopicCreation policy for a given namespace,
* allowing the broker to dictate the auto-creation policy.
* <p/>
*
* @param namespace
* Namespace name
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void removeAutoTopicCreation(String namespace) throws PulsarAdminException;
/**
* Get the bundles split data.
*
......
......@@ -38,6 +38,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
......@@ -737,6 +738,30 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
return asyncPostRequest(path, Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON));
}
@Override
public void setAutoTopicCreation(String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "autoTopicCreation");
request(path).post(Entity.entity(autoTopicCreationOverride,
MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void removeAutoTopicCreation(String namespace) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "autoTopicCreation");
request(path).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException {
try {
......
......@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
......@@ -72,6 +73,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
......@@ -355,6 +357,13 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true);
namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned"));
verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1"));
verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1");
namespaces.run(split("get-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
......
......@@ -37,6 +37,7 @@ import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
......@@ -51,6 +52,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations about namespaces")
......@@ -378,6 +380,60 @@ public class CmdNamespaces extends CmdBase {
}
}
@Parameters(commandDescription = "Enable or disable autoTopicCreation for a namespace, overriding broker settings")
private class SetAutoTopicCreation extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable", "-e" }, description = "Enable allowAutoTopicCreation on namespace")
private boolean enable = false;
@Parameter(names = { "--disable", "-d" }, description = "Disable allowAutoTopicCreation on namespace")
private boolean disable = false;
@Parameter(names = { "--type", "-t" }, description = "Type of topic to be auto-created. " +
"Possible values: (partitioned, non-partitioned). Default value: non-partitioned")
private String type = "non-partitioned";
@Parameter(names = { "--num-partitions", "-n" }, description = "Default number of partitions of topic to be auto-created," +
" applicable to partitioned topics only", required = false)
private Integer defaultNumPartitions = null;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
type = type.toLowerCase().trim();
if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}
if (enable) {
if (!TopicType.isValidTopicType(type)) {
throw new ParameterException("Must specify type of topic to be created. " +
"Possible values: (partitioned, non-partitioned)");
}
if (TopicType.PARTITIONED.toString().equals(type) && !(defaultNumPartitions > 0)) {
throw new ParameterException("Must specify num-partitions > 0 for partitioned topic type.");
}
}
admin.namespaces().setAutoTopicCreation(namespace, new AutoTopicCreationOverride(enable, type, defaultNumPartitions));
}
}
@Parameters(commandDescription = "Remove override of autoTopicCreation for a namespace")
private class RemoveAutoTopicCreation extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().removeAutoTopicCreation(namespace);
}
}
@Parameters(commandDescription = "Set the retention policy for a namespace")
private class SetRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
......@@ -1509,6 +1565,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-deduplication", new SetDeduplication());
jcommander.addCommand("set-auto-topic-creation", new SetAutoTopicCreation());
jcommander.addCommand("remove-auto-topic-creation", new RemoveAutoTopicCreation());
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;
import com.google.common.base.MoreObjects;
import java.util.Objects;
/**
* Override of autoTopicCreation settings on a namespace level.
*/
public class AutoTopicCreationOverride {
public boolean allowAutoTopicCreation;
public String topicType;
public Integer defaultNumPartitions;
public AutoTopicCreationOverride() {
}
public AutoTopicCreationOverride(boolean allowAutoTopicCreation, String topicType,
Integer defaultNumPartitions) {
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.topicType = topicType;
this.defaultNumPartitions = defaultNumPartitions;
}
@Override
public int hashCode() {
return Objects.hash(allowAutoTopicCreation, topicType, defaultNumPartitions);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof AutoTopicCreationOverride) {
AutoTopicCreationOverride other = (AutoTopicCreationOverride) obj;
return Objects.equals(this.allowAutoTopicCreation, other.allowAutoTopicCreation)
&& Objects.equals(this.topicType, other.topicType)
&& Objects.equals(this.defaultNumPartitions, other.defaultNumPartitions);
}
return false;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("allowAutoTopicCreation", allowAutoTopicCreation)
.add("topicType", topicType).add("defaultNumPartitions", defaultNumPartitions).toString();
}
public static boolean isValidOverride(AutoTopicCreationOverride override) {
if (override == null) {
return false;
}
if (override.allowAutoTopicCreation) {
if (!TopicType.isValidTopicType(override.topicType)) {
return false;
}
if (TopicType.PARTITIONED.toString().equals(override.topicType)) {
if (override.defaultNumPartitions == null) {
return false;
}
if (!(override.defaultNumPartitions > 0)) {
return false;
}
} else if (TopicType.NON_PARTITIONED.toString().equals(override.topicType)) {
if (override.defaultNumPartitions != null) {
return false;
}
}
}
return true;
}
}
......@@ -49,6 +49,8 @@ public class Policies {
// If set, it will override the broker settings for enabling deduplication
public Boolean deduplicationEnabled = null;
// If set, it will override the broker settings for allowing auto topic creation
public AutoTopicCreationOverride autoTopicCreationOverride = null;
public Map<String, PublishRate> publishMaxMessageRate = Maps.newHashMap();
@SuppressWarnings("checkstyle:MemberName")
......@@ -110,7 +112,7 @@ public class Policies {
return Objects.hash(auth_policies, replication_clusters,
backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate,
clusterSubscribeRate, deduplicationEnabled, persistence,
clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, persistence,
bundles, latency_stats_sample_rate,
message_ttl_in_seconds, retention_policies,
encryption_required, delayed_delivery_policies,
......@@ -141,6 +143,7 @@ public class Policies {
&& Objects.equals(clusterSubscribeRate, other.clusterSubscribeRate)
&& Objects.equals(publishMaxMessageRate, other.publishMaxMessageRate)
&& Objects.equals(deduplicationEnabled, other.deduplicationEnabled)
&& Objects.equals(autoTopicCreationOverride, other.autoTopicCreationOverride)
&& Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles)
&& Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate)
&& Objects.equals(message_ttl_in_seconds,
......@@ -190,6 +193,7 @@ public class Policies {
.add("replication_clusters", replication_clusters).add("bundles", bundles)
.add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
.add("deduplicationEnabled", deduplicationEnabled)
.add("autoTopicCreationOverride", autoTopicCreationOverride)
.add("clusterDispatchRate", clusterDispatchRate)
.add("topicDispatchRate", topicDispatchRate)
.add("subscriptionDispatchRate", subscriptionDispatchRate)
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;
/**
* Topic types -- partitioned or non-partitioned.
*/
public enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
private String type;
TopicType(String type) {
this.type = type;
}
public String toString() {
return type;
}
public static boolean isValidTopicType(String type) {
for (TopicType topicType : TopicType.values()) {
if (topicType.toString().equalsIgnoreCase(type)) {
return true;
}
}
return false;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;
import org.testng.annotations.Test;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class AutoTopicCreationOverrideTest {
@Test
public void testValidOverrideNonPartitioned() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null);
assertTrue(AutoTopicCreationOverride.isValidOverride(override));
}
@Test
public void testValidOverridePartitioned() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 2);
assertTrue(AutoTopicCreationOverride.isValidOverride(override));
}
@Test
public void testInvalidTopicType() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, "aaa", null);
assertFalse(AutoTopicCreationOverride.isValidOverride(override));
}
@Test
public void testNumPartitionsTooLow() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 0);
assertFalse(AutoTopicCreationOverride.isValidOverride(override));
}
@Test
public void testNumPartitionsNotSet() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), null);
assertFalse(AutoTopicCreationOverride.isValidOverride(override));
}
@Test
public void testNumPartitionsOnNonPartitioned() {
AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), 2);
assertFalse(AutoTopicCreationOverride.isValidOverride(override));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册