diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java index 96f9c7e04b79f9c06e0cfb06e099ea4831956f6f..98641387ee039447f9d1c3f654a5fc9f997edfeb 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java @@ -53,13 +53,13 @@ public class AuthorizationManager { return checkAuthorization(destination, role, AuthAction.produce); } - public boolean canProduce(DestinationName destination, String role) { + public boolean canProduce(DestinationName destination, String role) throws Exception { try { return canProduceAsync(destination, role).get(); } catch (Exception e) { log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role, destination, e); - return false; + throw e; } } @@ -76,13 +76,13 @@ public class AuthorizationManager { return checkAuthorization(destination, role, AuthAction.consume); } - public boolean canConsume(DestinationName destination, String role) { + public boolean canConsume(DestinationName destination, String role) throws Exception { try { return canConsumeAsync(destination, role).get(); } catch (Exception e) { log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role, destination, e); - return false; + throw e; } } @@ -94,8 +94,9 @@ public class AuthorizationManager { * @param destination * @param role * @return + * @throws Exception */ - public boolean canLookup(DestinationName destination, String role) { + public boolean canLookup(DestinationName destination, String role) throws Exception { return canProduce(destination, role) || canConsume(destination, role); } @@ -156,12 +157,12 @@ public class AuthorizationManager { }).exceptionally(ex -> { log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, ex); - permissionFuture.complete(false); + permissionFuture.completeExceptionally(ex); return null; }); } catch (Exception e) { log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e); - permissionFuture.complete(false); + permissionFuture.completeExceptionally(e); } return permissionFuture; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java index b01d79c18fa276dfc3b00197e3d9cda506c0b63a..794cf9783cdb4afe92a1289c5e4bf3e05ea8e2bb 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java @@ -994,8 +994,13 @@ public class PersistentTopics extends AdminResource { try { checkConnect(dn); - } catch (RestException e) { + } catch (WebApplicationException e) { validateAdminAccessOnProperty(dn.getProperty()); + } catch (Exception e) { + // unknown error marked as internal server error + log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination, + clientAppId(), e.getMessage(), e); + throw new RestException(e); } String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(), @@ -1024,6 +1029,12 @@ public class PersistentTopics extends AdminResource { throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s", clientAppId, dn.toString(), authException.getMessage())); } + } catch (Exception ex) { + // unknown error marked as internal server error + log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId, + dn.toString(), ex.getMessage(), ex); + throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s", + clientAppId, dn.toString(), ex.getMessage())); } String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName()); @@ -1072,7 +1083,7 @@ public class PersistentTopics extends AdminResource { metadataFuture.complete(new PartitionedTopicMetadata()); } }).exceptionally(ex -> { - metadataFuture.complete(new PartitionedTopicMetadata()); + metadataFuture.completeExceptionally(ex); return null; }); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java index 3bf223e738b2f655133e310282d18a88b2efa4eb..019da964b88e359982f8907ed93cd198b9be57ef 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java @@ -41,6 +41,8 @@ import com.yahoo.pulsar.broker.web.PulsarWebResource; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.web.RestException; +import com.yahoo.pulsar.client.api.PulsarClientException; + import static com.yahoo.pulsar.common.api.Commands.newLookupResponse; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; @@ -70,10 +72,15 @@ public class DestinationLookup extends PulsarWebResource { validateClusterOwnership(topic.getCluster()); checkConnect(topic); validateReplicationSettingsOnNamespace(pulsar(), topic.getNamespaceObject()); - } catch (Throwable t) { + } catch (WebApplicationException we) { // Validation checks failed - log.error("Validation check failed: {}", t.getMessage()); - asyncResponse.resume(t); + log.error("Validation check failed: {}", we.getMessage()); + asyncResponse.resume(we); + return; + } catch (Throwable t) { + // Validation checks failed with unknown error + log.error("Validation check failed: {}", t.getMessage(), t); + asyncResponse.resume(new RestException(t)); return; } @@ -162,10 +169,14 @@ public class DestinationLookup extends PulsarWebResource { // (2) authorize client try { checkAuthorization(pulsarService, fqdn, clientAppId); - } catch (Exception e) { + } catch (RestException authException) { log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString()); - validationFuture - .complete(newLookupResponse(ServerError.AuthorizationError, e.getMessage(), requestId)); + validationFuture.complete( + newLookupResponse(ServerError.AuthorizationError, authException.getMessage(), requestId)); + return; + } catch (Exception e) { + log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, fqdn.toString()); + validationFuture.completeExceptionally(e); return; } // (3) validate global namespace diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 6afeffe514d9d463b7dfbf4fd139cc8598fe6df1..994a7c703086e6a1a8f54edffbf28d2c8df37a69 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -417,8 +417,15 @@ public class Consumer { public void checkPermissions() { DestinationName destination = DestinationName.get(subscription.getDestination()); - if (cnx.getBrokerService().getAuthorizationManager() != null - && !cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) { + if (cnx.getBrokerService().getAuthorizationManager() != null) { + try { + if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) { + return; + } + } catch (Exception e) { + log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, subscription.getDestination(), + e.getMessage(), e); + } log.info("[{}] is not allowed to consume from Destination" + " [{}] anymore", appId, subscription.getDestination()); disconnect(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java index b961b6b9e49497bcd290ba5de91c19e82d5f5d30..e3a1171574c9dd7cd219f9a8101c9343e6956ebc 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java @@ -350,9 +350,16 @@ public class Producer { public void checkPermissions() { DestinationName destination = DestinationName.get(topic.getName()); - if (cnx.getBrokerService().getAuthorizationManager() != null - && !cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) { - log.info("[{}] is not allowed to consume from destination [{}] anymore", appId, topic.getName()); + if (cnx.getBrokerService().getAuthorizationManager() != null) { + try { + if (cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) { + return; + } + } catch (Exception e) { + log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, topic.getName(), e.getMessage(), + e); + } + log.info("[{}] is not allowed to produce from destination [{}] anymore", appId, topic.getName()); disconnect(); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java index d520945b1210f187bec21c4f883fb1c49891b97e..7f94c290eb94b5597cbf865e4e5ed2ee7176774e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java @@ -561,11 +561,11 @@ public abstract class PulsarWebResource { return validationFuture; } - protected void checkConnect(DestinationName destination) { + protected void checkConnect(DestinationName destination) throws RestException, Exception { checkAuthorization(pulsar(), destination, clientAppId()); } - protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) { + protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) throws RestException, Exception{ if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { // No enforcing of authorization policies return; @@ -579,11 +579,6 @@ public abstract class PulsarWebResource { } catch (RestException e) { // Let it through throw e; - } catch (Exception e) { - // unknown error marked as internal server error - log.warn("Error in authorizing lookup. destination={}, role={}. Error: {}", destination, role, - e.getMessage(), e); - throw new RestException(e); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/AuthorizationTest.java index d527e39d1bdc42965b5836030882538ee74fba3a..19c65bd78acb7229562f850c4e4b8c54ca9ef046 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/AuthorizationTest.java @@ -54,7 +54,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { } @Test - void simple() throws PulsarAdminException { + void simple() throws Exception { AuthorizationManager auth = pulsar.getBrokerService().getAuthorizationManager(); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java index ea1d7d7455105b4f0d34f68d08ad3009c93d6042..ca379ecee9ae8dc9101e42b003a10c54db541e79 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.InternalServerErrorException; import org.junit.Assert; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -71,7 +72,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { conf.setSuperUserRoles(superUserRoles); conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); - conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH); + conf.setBrokerClientAuthenticationParameters( + "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH); Set providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); @@ -117,7 +119,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { authTls.configure(authParams); internalSetup(authTls); - admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(),brokerUrlTls.toString(),"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); admin.namespaces().createNamespace("my-property/use/my-ns"); @@ -127,7 +130,6 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", conf); - ProducerConfiguration producerConf = new ProducerConfiguration(); if (batchMessageDelayMs != 0) { @@ -156,7 +158,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { consumer.close(); log.info("-- Exiting {} test --", methodName); } - + /** * Verifies: on 500 server error, broker invalidates session and client receives 500 correctly. * @@ -187,4 +189,45 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + /** + * verifies that topicLookup/PartitionMetadataLookup gives InternalServerError(500) instead 401(auth_failed) on + * unknown-exception failure + * + * @throws Exception + */ + @Test + public void testInternalServerExceptionOnLookup() throws Exception { + log.info("-- Starting {} test --", methodName); + + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + internalSetup(authTls); + + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); + admin.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + String namespace = "my-property/use/my-ns"; + admin.namespaces().createNamespace(namespace); + + String destination = "persistent://" + namespace + "1/topic1"; + // this will cause NPE and it should throw 500 + mockZookKeeper.shutdown(); + pulsar.getConfiguration().setSuperUserRoles(Sets.newHashSet()); + try { + admin.persistentTopics().getPartitionedTopicMetadata(destination); + } catch (PulsarAdminException e) { + Assert.assertTrue(e.getCause() instanceof InternalServerErrorException); + } + try { + admin.lookups().lookupDestination(destination); + } catch (PulsarAdminException e) { + Assert.assertTrue(e.getCause() instanceof InternalServerErrorException); + } + + } + } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java index 4a22a6470cf8b949461987f414fc10fab13ad4b1..632df94cdaa444d11cb7de5e29c4c6ec5f6d6f0c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java @@ -210,6 +210,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { // enable authorization: so, broker can validate cluster and redirect if finds different cluster pulsar.getConfiguration().setAuthorizationEnabled(true); + // restart broker with authorization enabled: it initialize AuthorizationManager + stopBroker(); + startBroker(); LoadManager loadManager2 = spy(pulsar2.getLoadManager()); Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 14057c88e9bcc884e792f92e59574e11e8e07b51..5c7839d780201ae8eb01aa75b555541c28e4e889 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -73,7 +73,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { } @Test - public void test() throws PulsarAdminException { + public void test() throws Exception { AuthorizationManager auth = service.getAuthorizationManager(); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false); diff --git a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java index e465cb74dd809af46fc6baa5e253e6f93a7513c2..a09a26242be85ebaacc5646faf3dd48a79cdb527 100644 --- a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -119,7 +119,7 @@ public class BrokerDiscoveryProvider implements Closeable { } protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role) - throws IllegalAccessException { + throws Exception { if (!service.getConfiguration().isAuthorizationEnabled() || service.getConfiguration().getSuperUserRoles().contains(role)) { // No enforcing of authorization policies