提交 afaa63fd 编写于 作者: R Rajan 提交者: Matteo Merli

Async authorization check while creation of producer/consumer (#98)

上级 15683fd0
......@@ -16,8 +16,8 @@
package com.yahoo.pulsar.broker.authorization;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -26,7 +26,6 @@ import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.AuthAction;
import com.yahoo.pulsar.common.policies.data.Policies;
/**
*/
......@@ -50,10 +49,20 @@ public class AuthorizationManager {
* @param role
* the app id used to send messages to the destination.
*/
public boolean canProduce(DestinationName destination, String role) {
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.produce);
}
public boolean canProduce(DestinationName destination, String role) {
try {
return canProduceAsync(destination, role).get();
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}
/**
* Check if the specified role has permission to receive messages from the specified fully qualified destination
* name.
......@@ -63,10 +72,20 @@ public class AuthorizationManager {
* @param role
* the app id used to receive messages from the destination.
*/
public boolean canConsume(DestinationName destination, String role) {
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.consume);
}
public boolean canConsume(DestinationName destination, String role) {
try {
return canConsumeAsync(destination, role).get();
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}
/**
* Check whether the specified role can perform a lookup for the specified destination.
*
......@@ -80,10 +99,14 @@ public class AuthorizationManager {
return canProduce(destination, role) || canConsume(destination, role);
}
private boolean checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role))
return true;
return checkPermission(destination, role, action) && checkCluster(destination);
private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
return checkPermission(destination, role, action)
.thenApply(isPermission -> isPermission && checkCluster(destination));
}
}
private boolean checkCluster(DestinationName destination) {
......@@ -98,38 +121,49 @@ public class AuthorizationManager {
}
}
public boolean checkPermission(DestinationName destination, String role, AuthAction action) {
public CompletableFuture<Boolean> checkPermission(DestinationName destination, String role,
AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
Optional<Policies> policies = configCache.policiesCache().get(POLICY_ROOT + destination.getNamespace());
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
}
return false;
}
permissionFuture.complete(false);
} else {
Set<AuthAction> namespaceActions = policies.get().auth_policies.namespace_auth.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
return true;
}
Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth.get(destination.toString());
permissionFuture.complete(true);
} else {
Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth
.get(destination.toString());
if (roles == null) {
// Destination has no custom policy
return false;
}
permissionFuture.complete(false);
} else {
Set<AuthAction> resourceActions = roles.get(role);
if (resourceActions != null && resourceActions.contains(action)) {
// The role has destination level permission
return true;
permissionFuture.complete(true);
} else {
permissionFuture.complete(false);
}
return false;
}
}
}
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.complete(false);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
return false;
permissionFuture.complete(false);
}
return permissionFuture;
}
/**
......
......@@ -181,19 +181,18 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
if (service.getAuthorizationManager().canConsume(DestinationName.get(subscribe.getTopic()), authRole)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
}
authorizationFuture = service.getAuthorizationManager()
.canConsumeAsync(DestinationName.get(subscribe.getTopic()), authRole);
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, msg));
return;
authorizationFuture = CompletableFuture.completedFuture(true);
}
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
}
final String topicName = subscribe.getTopic();
final String subscriptionName = subscribe.getSubscription();
final long requestId = subscribe.getRequestId();
......@@ -211,22 +210,20 @@ public class ServerCnx extends PulsarHandler {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen when client
// timeout
// is lower the broker timeouts. We need to wait until the previous consumer creation request either
// complete or fails.
// There was an early request to create a consumer with same consumerId. This can happen when
// client timeout is lower the broker timeouts. We need to wait until the previous consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName,
subscriptionName);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
"Consumer is already present on the connection"));
return;
return null;
}
}
service.getTopic(topicName)
.thenCompose(
service.getTopic(topicName).thenCompose(
topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, consumerName))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
......@@ -237,8 +234,8 @@ public class ServerCnx extends PulsarHandler {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}", remoteAddress,
consumer);
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn("[{}] Error closing consumer created after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
......@@ -248,10 +245,11 @@ public class ServerCnx extends PulsarHandler {
}) //
.exceptionally(exception -> {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
// If client timed out, the future would have been completed by subsequent close. Send error back to
// If client timed out, the future would have been completed by subsequent close. Send error
// back to
// client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
ctx.writeAndFlush(Commands.newError(requestId,
......@@ -263,25 +261,30 @@ public class ServerCnx extends PulsarHandler {
return null;
});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, msg));
}
return null;
});
}
@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
if (service.getAuthorizationManager().canProduce(DestinationName.get(cmdProducer.getTopic().toString()),
authRole)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
}
authorizationFuture = service.getAuthorizationManager()
.canProduceAsync(DestinationName.get(cmdProducer.getTopic().toString()), authRole);
} else {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg));
return;
authorizationFuture = CompletableFuture.completedFuture(true);
}
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
}
final String producerName;
if (cmdProducer.hasProducerName()) {
// Use producer name provided by client
......@@ -304,15 +307,17 @@ public class ServerCnx extends PulsarHandler {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
return;
return null;
} else {
// There was an early request to create a producer with same producerId. This can happen when client
// timeout is lower the broker timeouts. We need to wait until the previous producer creation request
// There was an early request to create a producer with same producerId. This can happen when
// client
// timeout is lower the broker timeouts. We need to wait until the previous producer creation
// request
// either complete or fails.
log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
"Producer is already present on the connection"));
return;
return null;
}
}
......@@ -325,10 +330,11 @@ public class ServerCnx extends PulsarHandler {
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage()));
ctx.writeAndFlush(Commands.newError(requestId,
ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage()));
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
ctx.writeAndFlush(
Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage()));
}
producerFuture.completeExceptionally(illegalStateException);
......@@ -356,14 +362,16 @@ public class ServerCnx extends PulsarHandler {
}
} else {
producer.closeNow();
log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress, producer);
log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress,
producer);
producerFuture.completeExceptionally(
new IllegalStateException("Producer created after connection was closed"));
}
} catch (BrokerServiceException ise) {
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, ise.getMessage());
ctx.writeAndFlush(
Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
ise.getMessage());
ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise),
ise.getMessage()));
producerFuture.completeExceptionally(ise);
}
......@@ -383,6 +391,13 @@ public class ServerCnx extends PulsarHandler {
}
producers.remove(producerId, producerFuture);
return null;
});
} else {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg));
}
return null;
});
}
......
......@@ -381,7 +381,7 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws Exception {
AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
doReturn(true).when(authorizationManager).canProduce(Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthenticationEnabled();
resetChannel();
......@@ -409,7 +409,7 @@ public class ServerCnxTest {
ConfigurationCacheService configCacheService = mock(ConfigurationCacheService.class);
doReturn(configCacheService).when(pulsar).getConfigurationCache();
doReturn(zkDataCache).when(configCacheService).policiesCache();
doThrow(new NoNodeException()).when(zkDataCache).get(matches(".*nonexistent.*"));
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));
AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
......@@ -440,7 +440,7 @@ public class ServerCnxTest {
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString());
doReturn(true).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(),
doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(),
any(AuthAction.class));
resetChannel();
......@@ -493,7 +493,7 @@ public class ServerCnxTest {
public void testProducerCommandWithAuthorizationNegative() throws Exception {
AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
doReturn(false).when(authorizationManager).canProduce(Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
......@@ -1022,7 +1022,7 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
doReturn(true).when(authorizationManager).canConsume(Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any());
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
......@@ -1042,7 +1042,7 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
doReturn(false).when(authorizationManager).canConsume(Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any());
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
......
......@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
......@@ -73,11 +74,16 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
}
}
if (service.isAuthorizationEnabled() && !isAuthorized(authRole)) {
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole,
if (service.isAuthorizationEnabled()) {
final String role = authRole;
isAuthorized(authRole).thenApply(isAuthorized -> {
if(!isAuthorized) {
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role,
topic);
close(WebSocketError.NotAuthorizedError);
return;
}
return null;
});
}
}
......@@ -120,7 +126,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return null;
}
protected abstract boolean isAuthorized(String authRole);
protected abstract CompletableFuture<Boolean> isAuthorized(String authRole);
private String extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI();
......
......@@ -24,6 +24,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -177,8 +178,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
@Override
protected boolean isAuthorized(String authRole) {
return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
protected CompletableFuture<Boolean> isAuthorized(String authRole) {
return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole);
}
private static String extractSubscription(HttpServletRequest request) {
......
......@@ -17,6 +17,7 @@ package com.yahoo.pulsar.websocket;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
......@@ -119,8 +120,8 @@ public class ProducerHandler extends AbstractWebSocketHandler {
});
}
protected boolean isAuthorized(String authRole) {
return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
protected CompletableFuture<Boolean> isAuthorized(String authRole) {
return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole);
}
private void sendAckResponse(ProducerAck response) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册