From 10ac6c163b40bc07ce79f7c6122a02de2f8037a2 Mon Sep 17 00:00:00 2001 From: Yuki Shiga Date: Tue, 13 Jun 2017 08:09:05 +0900 Subject: [PATCH] WebSocket proxy should not make a consumer/producer when authorization is failed (#448) --- .../websocket/AbstractWebSocketHandler.java | 27 ++++++++++++------- .../pulsar/websocket/ConsumerHandler.java | 8 +++--- .../pulsar/websocket/ProducerHandler.java | 11 ++++---- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java index 20e257c9214..feacd1ec4bf 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java @@ -63,11 +63,11 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen if (service.isAuthenticationEnabled()) { try { authRole = service.getAuthenticationService().authenticateHttpRequest(request); - log.info("[{}] Authenticated WebSocket producer {} on topic {}", session.getRemoteAddress(), authRole, + log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole, topic); } catch (AuthenticationException e) { - log.warn("[{}] Failed to authenticated WebSocket producer {} on topic {}: {}", + log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}", session.getRemoteAddress(), authRole, topic, e.getMessage()); close(WebSocketError.AuthenticationError); return; @@ -75,16 +75,21 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen } if (service.isAuthorizationEnabled()) { - final String role = authRole; - isAuthorized(authRole).thenApply(isAuthorized -> { - if(!isAuthorized) { - log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role, + try { + if (!isAuthorized(authRole)) { + log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole, topic); close(WebSocketError.NotAuthorizedError); + return; } - return null; - }); + } catch (Exception e) { + log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}", + session.getRemoteAddress(), authRole, topic, e.getMessage()); + close(WebSocketError.UnknownError); + return; + } } + createClient(session); } @Override @@ -125,8 +130,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen return null; } - protected abstract CompletableFuture isAuthorized(String authRole); - private String extractTopicName(HttpServletRequest request) { String uri = request.getRequestURI(); List parts = Splitter.on("/").splitToList(uri); @@ -143,5 +146,9 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen return dn.toString(); } + protected abstract Boolean isAuthorized(String authRole) throws Exception; + + protected abstract void createClient(Session session); + private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class); } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 7f489f613f4..785dcc75108 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -87,9 +87,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); this.service.addConsumer(this); @@ -247,8 +245,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } @Override - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole); + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); } private static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index 6ede4e2d0e4..1053eb045f1 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -88,14 +88,14 @@ public class ProducerHandler extends AbstractWebSocketHandler { } @Override - public void onWebSocketConnect(Session session) { - super.onWebSocketConnect(session); - + protected void createClient(Session session) { try { ProducerConfiguration conf = getProducerConfiguration(); this.producer = service.getPulsarClient().createProducer(topic, conf); this.service.addProducer(this); } catch (Exception e) { + log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(), + topic, e); close(FailedToCreateProducer, e.getMessage()); } } @@ -176,8 +176,9 @@ public class ProducerHandler extends AbstractWebSocketHandler { return MSG_PUBLISHED_COUNTER_UPDATER.get(this); } - protected CompletableFuture isAuthorized(String authRole) { - return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); + @Override + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole); } private void sendAckResponse(ProducerAck response) { -- GitLab