diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/client/cli/CmdConsume.java index 52ea553787dc34f5cb1c02eb27360611e77b9e0f..abd7b9c62d173a27725d2f522fc5c6f40bf4b91d 100644 --- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/client/cli/CmdConsume.java @@ -130,10 +130,11 @@ public class CmdConsume { RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null; while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) { - if (limiter != null) + if (limiter != null) { limiter.acquire(); + } - Message msg = consumer.receive(20, TimeUnit.SECONDS); + Message msg = consumer.receive(5, TimeUnit.SECONDS); if (msg == null) { LOG.warn("No message to consume after waiting for 20 seconds."); } else { diff --git a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/client/cli/PulsarClientToolTest.java index 0a7fee7310331d4e7574af0fee3e3a23e16bcf95..ba7c43eb20413b4ea4ba427e5999a2f4f19d3dd2 100644 --- a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/client/cli/PulsarClientToolTest.java @@ -15,15 +15,12 @@ */ package com.yahoo.pulsar.client.cli; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; import java.net.MalformedURLException; import java.util.Properties; -import java.lang.InterruptedException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -31,7 +28,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.yahoo.pulsar.broker.service.BrokerTestBase; -import com.yahoo.pulsar.client.cli.PulsarClientTool; @Test public class PulsarClientToolTest extends BrokerTestBase { @@ -56,30 +52,31 @@ public class PulsarClientToolTest extends BrokerTestBase { String topicName = "persistent://property/ns/topic-scale-ns-0/topic"; - int numberOfMessages = 100; + int numberOfMessages = 10; - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(() -> { + CompletableFuture future = new CompletableFuture(); + executor.execute(() -> { PulsarClientTool pulsarClientToolConsumer; try { pulsarClientToolConsumer = new PulsarClientTool(properties); String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n", - Integer.toString(numberOfMessages), "--hex", "-r", "100", topicName }; + Integer.toString(numberOfMessages), "--hex", "-r", "10", topicName }; Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); - } catch (MalformedURLException e) { - Assert.fail("Exception : " + e.getMessage()); - return false; + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); } - return true; }); PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", - "200", topicName }; + "20", topicName }; Assert.assertEquals(pulsarClientToolProducer.run(args), 0); - Assert.assertTrue(future.get(), "Exception occured while running consume task."); + future.get(); + executor.shutdown(); } }