提交 3047d1c2 编写于 作者: N nkurihar 提交者: Yuki Shiga

Add empty check for subscription name (#559)

上级 ae6ca6ba
......@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isBlank;
import java.time.Instant;
import java.time.ZoneId;
......@@ -349,6 +350,14 @@ public class PersistentTopic implements Topic, AddEntryCallback {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
}
future.completeExceptionally(new NamingException("Empty subscription name"));
return future;
}
if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
......
......@@ -353,6 +353,25 @@ public class PersistentTopicTest {
topic.removeProducer(producer); /* noop */
}
@Test
public void testSubscribeFail() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
// Empty subscription name
CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName)
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
try {
f1.get();
fail("should fail with exception");
} catch (ExecutionException ee) {
// Expected
assertTrue(ee.getCause() instanceof BrokerServiceException.NamingException);
}
}
@Test
public void testSubscribeUnsubscribe() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
......
......@@ -448,6 +448,14 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
}
try {
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", "",
consumerConf);
Assert.fail("Should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
}
try {
Consumer consumer = pulsarClient.subscribe("invalid://topic7", "my-subscriber-name", consumerConf);
Assert.fail("Should fail");
......
......@@ -48,6 +48,7 @@ import org.apache.pulsar.websocket.stats.ProxyTopicStat;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.glassfish.jersey.client.ClientConfig;
......@@ -169,6 +170,44 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
@Test(timeOut = 10000)
public void badConsumerTest() throws Exception {
// Empty subcription name
String consumerUri = "ws://localhost:" + port
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive";
URI consumeUri = URI.create(consumerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
consumerFuture1.get();
Assert.fail("should fail: empty subscription");
} catch (Exception e) {
// Expected
Assert.assertTrue(e.getCause() instanceof UpgradeException);
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
executor.submit(() -> {
try {
consumeClient1.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}
/**
* It verifies proxy topic-stats and proxy-metrics api
*
......
......@@ -56,6 +56,8 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
......@@ -233,9 +235,9 @@ public class PulsarClientImpl implements PulsarClient {
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
if (subscription == null) {
if (isBlank(subscription)) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid subscription name"));
.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
}
if (conf == null) {
return FutureUtil.failedFuture(
......
......@@ -267,6 +267,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
checkArgument(parts.size() == 9, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
checkArgument(parts.get(3).equals("persistent"));
checkArgument(parts.get(8).length() > 0, "Empty subscription name");
return parts.get(8);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册