提交 10ac6c16 编写于 作者: Y Yuki Shiga 提交者: Matteo Merli

WebSocket proxy should not make a consumer/producer when authorization is failed (#448)

上级 019e5f3b
......@@ -63,11 +63,11 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
if (service.isAuthenticationEnabled()) {
try {
authRole = service.getAuthenticationService().authenticateHttpRequest(request);
log.info("[{}] Authenticated WebSocket producer {} on topic {}", session.getRemoteAddress(), authRole,
log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole,
topic);
} catch (AuthenticationException e) {
log.warn("[{}] Failed to authenticated WebSocket producer {} on topic {}: {}",
log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.AuthenticationError);
return;
......@@ -75,16 +75,21 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
}
if (service.isAuthorizationEnabled()) {
final String role = authRole;
isAuthorized(authRole).thenApply(isAuthorized -> {
if(!isAuthorized) {
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role,
try {
if (!isAuthorized(authRole)) {
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole,
topic);
close(WebSocketError.NotAuthorizedError);
return;
}
return null;
});
} catch (Exception e) {
log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.UnknownError);
return;
}
}
createClient(session);
}
@Override
......@@ -125,8 +130,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return null;
}
protected abstract CompletableFuture<Boolean> isAuthorized(String authRole);
private String extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri);
......@@ -143,5 +146,9 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return dn.toString();
}
protected abstract Boolean isAuthorized(String authRole) throws Exception;
protected abstract void createClient(Session session);
private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
}
......@@ -87,9 +87,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
protected void createClient(Session session) {
try {
this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
this.service.addConsumer(this);
......@@ -247,8 +245,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
@Override
protected CompletableFuture<Boolean> isAuthorized(String authRole) {
return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole);
protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
}
private static String extractSubscription(HttpServletRequest request) {
......
......@@ -88,14 +88,14 @@ public class ProducerHandler extends AbstractWebSocketHandler {
}
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
protected void createClient(Session session) {
try {
ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf);
this.service.addProducer(this);
} catch (Exception e) {
log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(),
topic, e);
close(FailedToCreateProducer, e.getMessage());
}
}
......@@ -176,8 +176,9 @@ public class ProducerHandler extends AbstractWebSocketHandler {
return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
}
protected CompletableFuture<Boolean> isAuthorized(String authRole) {
return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole);
@Override
protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
}
private void sendAckResponse(ProducerAck response) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册