提交 895509da 编写于 作者: R Rajan 提交者: Matteo Merli

Handle unknown exception on authorization check (#270)

* Handle unknown exception on authorization check

* fix test:init AuthorizationManager on Auth-enable and set status-500 on topic-lookup-internal-failire

* fail partitionMetadata on zk-failure
上级 9d274a11
......@@ -53,13 +53,13 @@ public class AuthorizationManager {
return checkAuthorization(destination, role, AuthAction.produce);
}
public boolean canProduce(DestinationName destination, String role) {
public boolean canProduce(DestinationName destination, String role) throws Exception {
try {
return canProduceAsync(destination, role).get();
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
throw e;
}
}
......@@ -76,13 +76,13 @@ public class AuthorizationManager {
return checkAuthorization(destination, role, AuthAction.consume);
}
public boolean canConsume(DestinationName destination, String role) {
public boolean canConsume(DestinationName destination, String role) throws Exception {
try {
return canConsumeAsync(destination, role).get();
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
throw e;
}
}
......@@ -94,8 +94,9 @@ public class AuthorizationManager {
* @param destination
* @param role
* @return
* @throws Exception
*/
public boolean canLookup(DestinationName destination, String role) {
public boolean canLookup(DestinationName destination, String role) throws Exception {
return canProduce(destination, role) || canConsume(destination, role);
}
......@@ -156,12 +157,12 @@ public class AuthorizationManager {
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.complete(false);
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
permissionFuture.complete(false);
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
}
......
......@@ -994,8 +994,13 @@ public class PersistentTopics extends AdminResource {
try {
checkConnect(dn);
} catch (RestException e) {
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(dn.getProperty());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
......@@ -1024,6 +1029,12 @@ public class PersistentTopics extends AdminResource {
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// unknown error marked as internal server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
dn.toString(), ex.getMessage(), ex);
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), ex.getMessage()));
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
......@@ -1072,7 +1083,7 @@ public class PersistentTopics extends AdminResource {
metadataFuture.complete(new PartitionedTopicMetadata());
}
}).exceptionally(ex -> {
metadataFuture.complete(new PartitionedTopicMetadata());
metadataFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
......
......@@ -41,6 +41,8 @@ import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.client.api.PulsarClientException;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
......@@ -70,10 +72,15 @@ public class DestinationLookup extends PulsarWebResource {
validateClusterOwnership(topic.getCluster());
checkConnect(topic);
validateReplicationSettingsOnNamespace(pulsar(), topic.getNamespaceObject());
} catch (Throwable t) {
} catch (WebApplicationException we) {
// Validation checks failed
log.error("Validation check failed: {}", t.getMessage());
asyncResponse.resume(t);
log.error("Validation check failed: {}", we.getMessage());
asyncResponse.resume(we);
return;
} catch (Throwable t) {
// Validation checks failed with unknown error
log.error("Validation check failed: {}", t.getMessage(), t);
asyncResponse.resume(new RestException(t));
return;
}
......@@ -162,10 +169,14 @@ public class DestinationLookup extends PulsarWebResource {
// (2) authorize client
try {
checkAuthorization(pulsarService, fqdn, clientAppId);
} catch (Exception e) {
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString());
validationFuture
.complete(newLookupResponse(ServerError.AuthorizationError, e.getMessage(), requestId));
validationFuture.complete(
newLookupResponse(ServerError.AuthorizationError, authException.getMessage(), requestId));
return;
} catch (Exception e) {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, fqdn.toString());
validationFuture.completeExceptionally(e);
return;
}
// (3) validate global namespace
......
......@@ -417,8 +417,15 @@ public class Consumer {
public void checkPermissions() {
DestinationName destination = DestinationName.get(subscription.getDestination());
if (cnx.getBrokerService().getAuthorizationManager() != null
&& !cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
if (cnx.getBrokerService().getAuthorizationManager() != null) {
try {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
return;
}
} catch (Exception e) {
log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, subscription.getDestination(),
e.getMessage(), e);
}
log.info("[{}] is not allowed to consume from Destination" + " [{}] anymore", appId,
subscription.getDestination());
disconnect();
......
......@@ -350,9 +350,16 @@ public class Producer {
public void checkPermissions() {
DestinationName destination = DestinationName.get(topic.getName());
if (cnx.getBrokerService().getAuthorizationManager() != null
&& !cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) {
log.info("[{}] is not allowed to consume from destination [{}] anymore", appId, topic.getName());
if (cnx.getBrokerService().getAuthorizationManager() != null) {
try {
if (cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) {
return;
}
} catch (Exception e) {
log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, topic.getName(), e.getMessage(),
e);
}
log.info("[{}] is not allowed to produce from destination [{}] anymore", appId, topic.getName());
disconnect();
}
}
......
......@@ -561,11 +561,11 @@ public abstract class PulsarWebResource {
return validationFuture;
}
protected void checkConnect(DestinationName destination) {
protected void checkConnect(DestinationName destination) throws RestException, Exception {
checkAuthorization(pulsar(), destination, clientAppId());
}
protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) {
protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) throws RestException, Exception{
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
return;
......@@ -579,11 +579,6 @@ public abstract class PulsarWebResource {
} catch (RestException e) {
// Let it through
throw e;
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Error in authorizing lookup. destination={}, role={}. Error: {}", destination, role,
e.getMessage(), e);
throw new RestException(e);
}
}
......
......@@ -54,7 +54,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
}
@Test
void simple() throws PulsarAdminException {
void simple() throws Exception {
AuthorizationManager auth = pulsar.getBrokerService().getAuthorizationManager();
assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
......
......@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.InternalServerErrorException;
import org.junit.Assert;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
......@@ -71,7 +72,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
conf.setSuperUserRoles(superUserRoles);
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
conf.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
......@@ -117,7 +119,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
authTls.configure(authParams);
internalSetup(authTls);
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(),brokerUrlTls.toString(),"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
......@@ -127,7 +130,6 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
if (batchMessageDelayMs != 0) {
......@@ -156,7 +158,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* Verifies: on 500 server error, broker invalidates session and client receives 500 correctly.
*
......@@ -187,4 +189,45 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
/**
* verifies that topicLookup/PartitionMetadataLookup gives InternalServerError(500) instead 401(auth_failed) on
* unknown-exception failure
*
* @throws Exception
*/
@Test
public void testInternalServerExceptionOnLookup() throws Exception {
log.info("-- Starting {} test --", methodName);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
internalSetup(authTls);
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
String namespace = "my-property/use/my-ns";
admin.namespaces().createNamespace(namespace);
String destination = "persistent://" + namespace + "1/topic1";
// this will cause NPE and it should throw 500
mockZookKeeper.shutdown();
pulsar.getConfiguration().setSuperUserRoles(Sets.newHashSet());
try {
admin.persistentTopics().getPartitionedTopicMetadata(destination);
} catch (PulsarAdminException e) {
Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
}
try {
admin.lookups().lookupDestination(destination);
} catch (PulsarAdminException e) {
Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
}
}
}
......@@ -210,6 +210,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// enable authorization: so, broker can validate cluster and redirect if finds different cluster
pulsar.getConfiguration().setAuthorizationEnabled(true);
// restart broker with authorization enabled: it initialize AuthorizationManager
stopBroker();
startBroker();
LoadManager loadManager2 = spy(pulsar2.getLoadManager());
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
......
......@@ -73,7 +73,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
}
@Test
public void test() throws PulsarAdminException {
public void test() throws Exception {
AuthorizationManager auth = service.getAuthorizationManager();
assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
......
......@@ -119,7 +119,7 @@ public class BrokerDiscoveryProvider implements Closeable {
}
protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role)
throws IllegalAccessException {
throws Exception {
if (!service.getConfiguration().isAuthorizationEnabled()
|| service.getConfiguration().getSuperUserRoles().contains(role)) {
// No enforcing of authorization policies
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册