提交 44fd8265 编写于 作者: M massakam 提交者: Matteo Merli

Fix bug that consumer which specify incorrect subscription hangs up w… (#1256)

* Fix bug that consumer which specify incorrect subscription hangs up when subscription_auth_mode is Prefix

* Add test for subscription prefix authorization
上级 e13bdb32
......@@ -670,6 +670,11 @@ public class ServerCnx extends PulsarHandler {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(e -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
log.warn(msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
return null;
});
} else {
final String msg = "Proxy Client is not authorized to subscribe";
......@@ -856,6 +861,11 @@ public class ServerCnx extends PulsarHandler {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(e -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
log.warn(msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
return null;
});
} else {
final String msg = "Proxy Client is not authorized to Produce";
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import java.io.IOException;
......@@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
......@@ -147,6 +149,46 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
setup();
ClientConfiguration adminConf = new ClientConfiguration();
Authentication adminAuthentication = new ClientAuthentication("superUser");
adminConf.setAuthentication(adminAuthentication);
admin = spy(new PulsarAdmin(brokerUrl, adminConf));
String lookupUrl;
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
ClientConfiguration clientConfValid = new ClientConfiguration();
Authentication authentication = new ClientAuthentication(clientRole);
clientConfValid.setAuthentication(authentication);
pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
admin.properties().createProperty("prop-prefix",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("prop-prefix/use/ns");
// (1) Valid subscription name will be approved by authorization service
Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1");
consumer.close();
// (2) InValid subscription name will be rejected by authorization service
try {
consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
Assert.fail("should have failed with authorization error");
} catch (PulsarClientException.AuthorizationException pa) {
// Ok
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testGrantPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -337,6 +379,24 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
}
public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
@Override
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
if (isNotBlank(subscription)) {
if (!subscription.startsWith(role)) {
future.completeExceptionally(new PulsarServerException(
"The subscription name needs to be prefixed by the authentication role"));
}
}
future.complete(clientRole.equals(role));
return future;
}
}
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
private Set<String> grantRoles = Sets.newHashSet();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册