提交 0797c4d3 编写于 作者: M Matteo Merli 提交者: GitHub

Fixed tests timeouts for PartitionedProducerConsumerTest (#500)

上级 5b93f10d
......@@ -22,6 +22,7 @@ import static org.testng.Assert.fail;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -38,23 +39,30 @@ import com.yahoo.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import com.yahoo.pulsar.client.impl.PartitionedProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import io.netty.util.concurrent.DefaultThreadFactory;
public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
private ExecutorService executor;
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("PartitionedProducerConsumerTest"));
}
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
executor.shutdown();
}
@Test
@Test(timeOut = 30000)
public void testRoundRobinProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -95,8 +103,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test
@Test(timeOut = 30000)
public void testPartitionedTopicNameWithSpecialCharacter() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -115,7 +123,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test
@Test(timeOut = 30000)
public void testSinglePartitionProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -159,7 +167,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test
@Test(timeOut = 30000)
public void testKeyBasedProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -216,7 +224,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
Assert.assertTrue(messageSet.add(message), "Received duplicate message " + message);
}
@Test
@Test(timeOut = 30000)
public void testInvalidSequence() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -266,7 +274,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
@Test
@Test(timeOut = 30000)
public void testSillyUser() throws Exception {
int numPartitions = 4;
......@@ -328,7 +336,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
@Test
@Test(timeOut = 30000)
public void testDeletePartitionedTopic() throws Exception {
int numPartitions = 4;
DestinationName dn = DestinationName.get("persistent://my-property/use/my-ns/my-partitionedtopic6");
......@@ -348,7 +356,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
}
@Test(timeOut = 4000)
@Test(timeOut = 30000)
public void testAsyncPartitionedProducerConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -381,7 +389,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
// receive messages
CountDownLatch latch = new CountDownLatch(totalMsg);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, Executors.newFixedThreadPool(1));
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
latch.await();
......@@ -399,7 +407,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 4000)
@Test(timeOut = 30000)
public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -432,7 +440,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
// receive messages
CountDownLatch latch = new CountDownLatch(totalMsg);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, Executors.newFixedThreadPool(1));
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
latch.await();
......@@ -452,10 +460,10 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
/**
* It verifies that consumer consumes from all the partitions fairly.
*
*
* @throws Exception
*/
@Test
@Test(timeOut = 30000)
public void testFairDistributionForPartitionConsumers() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -513,7 +521,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch,
final Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
if (currentMessage < totalMessage) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册