diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b50eee6ec6255d17d41101e0aae4ecc0bf1a4ecd..bdcc6e3742e0454e44928bb7606dc98b7a4fe4c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -670,6 +670,11 @@ public class ServerCnx extends PulsarHandler { ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; + }).exceptionally(e -> { + String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); + log.warn(msg); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); + return null; }); } else { final String msg = "Proxy Client is not authorized to subscribe"; @@ -856,6 +861,11 @@ public class ServerCnx extends PulsarHandler { ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; + }).exceptionally(e -> { + String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); + log.warn(msg); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); + return null; }); } else { final String msg = "Proxy Client is not authorized to Produce"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 4f4fcd6a19ebadf6e345eab33f6988aedf1ce8d8..465cc6d52fb96f3ec32ea968fe00ca36674db65a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.mockito.Mockito.spy; import java.io.IOException; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -147,6 +149,46 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @Test + public void testSubscriptionPrefixAuthorization() throws Exception { + log.info("-- Starting {} test --", methodName); + + conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName()); + setup(); + + ClientConfiguration adminConf = new ClientConfiguration(); + Authentication adminAuthentication = new ClientAuthentication("superUser"); + adminConf.setAuthentication(adminAuthentication); + admin = spy(new PulsarAdmin(brokerUrl, adminConf)); + + String lookupUrl; + lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + + ClientConfiguration clientConfValid = new ClientConfiguration(); + Authentication authentication = new ClientAuthentication(clientRole); + clientConfValid.setAuthentication(authentication); + + pulsarClient = PulsarClient.create(lookupUrl, clientConfValid); + + admin.properties().createProperty("prop-prefix", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("prop-prefix/use/ns"); + + // (1) Valid subscription name will be approved by authorization service + Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1"); + consumer.close(); + + // (2) InValid subscription name will be rejected by authorization service + try { + consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1"); + Assert.fail("should have failed with authorization error"); + } catch (PulsarClientException.AuthorizationException pa) { + // Ok + } + + log.info("-- Exiting {} test --", methodName); + } + @Test public void testGrantPermission() throws Exception { log.info("-- Starting {} test --", methodName); @@ -337,6 +379,24 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { } } + public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider { + + @Override + public CompletableFuture canConsumeAsync(DestinationName destination, String role, + AuthenticationDataSource authenticationData, String subscription) { + CompletableFuture future = new CompletableFuture<>(); + if (isNotBlank(subscription)) { + if (!subscription.startsWith(role)) { + future.completeExceptionally(new PulsarServerException( + "The subscription name needs to be prefixed by the authentication role")); + } + } + future.complete(clientRole.equals(role)); + return future; + } + + } + public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider { private Set grantRoles = Sets.newHashSet();