提交 d7460246 编写于 作者: R Rajan 提交者: Matteo Merli

Fix testConsumerBlockingWithUnAckedMessagesAndRedelivery : add strategically retry (#510)

上级 fde9b6cd
......@@ -133,8 +133,9 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
}
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 2);
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages: check
// delta as 3 consumers with receiverQueueSize = 10
assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 3);
// start acknowledging messages
messages.forEach((m, c) -> {
......@@ -145,19 +146,28 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
});
// wait to start dispatching-async
Thread.sleep(2000);
Thread.sleep(1000);
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages.size();
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(msg, consumers[i]);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(msg, consumers[i]);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
}
if (messages.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
}
}
// total received-messages should match to produced messages
......@@ -258,9 +268,9 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
}
// check all unacked messages have been redelivered
// check all unacked messages have been redelivered: with delta 30: 3 consumers with receiverQueueSize=10
Set<MessageId> result = Sets.newHashSet(messages.values());
assertEquals(totalConsumedMsgs, result.size(), 2 * receiverQueueSize);
assertEquals(totalConsumedMsgs, result.size(), 3 * receiverQueueSize);
// start acknowledging messages
messages.asMap().forEach((c, msgs) -> {
......@@ -275,21 +285,29 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// now: dispatcher must be unblocked: wait to start dispatching-async
Thread.sleep(1000);
// try to consume remaining messages
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers[i].receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(consumers[i], msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
result = Sets.newHashSet(messages.values());
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
result.add(msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
}
if (result.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
}
}
result = Sets.newHashSet(messages.values());
// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, result.size());
producer.close();
......@@ -365,14 +383,23 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// close consumer1: all messages of consumer1 must be replayed and received by consumer2
consumer1.close();
Map<Message, Consumer> messages2 = Maps.newHashMap();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer2.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages2.put(msg, consumer2);
consumer2.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer2.receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
messages2.put(msg, consumer2);
consumer2.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
if (messages2.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
}
}
......@@ -488,25 +515,33 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
});
// now: dispatcher must be unblocked: wait to start dispatching-async
Thread.sleep(2000);
Thread.sleep(1000);
result.clear();
messages1.values().forEach(s -> result.addAll(s));
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages1.size();
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages1.putIfAbsent(consumers[i], Sets.newHashSet());
messages1.get(consumers[i]).add(msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
result.add(msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
}
if (result.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
}
}
result.clear();
messages1.values().forEach(s -> result.addAll(s));
// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, result.size());
producer.close();
......@@ -737,7 +772,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
/*****
* (1) try to consume messages: without acking messages and dispatcher will be blocked once it reaches
* maxUnAckPerBroker limit
......@@ -753,7 +788,6 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
//assertEquals(messages1.size(), maxUnAckPerBroker, 2 * receiverQueueSize);
assertNotEquals(messages1.size(), totalProducedMsgs);
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl consumer2Sub1 = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriberName1, conf);
......@@ -908,7 +942,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> pulsar.getBrokerService().checkUnAckMessageDispatching(), 10, 10,
TimeUnit.MILLISECONDS);
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
new ProducerConfiguration());
......@@ -933,7 +967,6 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
//assertEquals(messages1.size(), maxUnAckPerBroker, 2 * receiverQueueSize);
assertNotEquals(messages1.size(), totalProducedMsgs);
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl consumer2Sub1 = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriberName1, conf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册