提交 ae3494d8 编写于 作者: L lipenghui 提交者: Jia Zhai

Catch throwable in interceptors of consumer and producer. (#4860)

(cherry picked from commit 00ce7815)
上级 13c88da1
......@@ -145,6 +145,107 @@ public class InterceptorsTest extends ProducerConsumerBase {
producer.close();
}
@Test
public void testProducerInterceptorsWithErrors() throws PulsarClientException {
ProducerInterceptor<String> interceptor = new ProducerInterceptor<String>() {
@Override
public void close() {
}
@Override
public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
throw new AbstractMethodError();
}
@Override
public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable exception) {
throw new AbstractMethodError();
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.intercept(interceptor)
.create();
MessageId messageId = producer.newMessage().value("Hello Pulsar!").send();
Assert.assertNotNull(messageId);
producer.close();
}
@Test
public void testConsumerInterceptorWithErrors() throws PulsarClientException {
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {
throw new AbstractMethodError();
}
@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
throw new AbstractMethodError();
}
@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable exception) {
throw new AbstractMethodError();
}
@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable exception) {
throw new AbstractMethodError();
}
@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
throw new AbstractMethodError();
}
@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
throw new AbstractMethodError();
}
};
Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic-exception")
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription-ack-timeout")
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic-exception")
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription-negative")
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic-exception")
.create();
producer.newMessage().value("Hello Pulsar!").send();
Message<String> received = consumer1.receive();
Assert.assertEquals(received.getValue(), "Hello Pulsar!");
// wait ack timeout
Message<String> receivedAgain = consumer1.receive();
Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
consumer1.acknowledge(receivedAgain);
received = consumer2.receive();
Assert.assertEquals(received.getValue(), "Hello Pulsar!");
consumer2.negativeAcknowledge(received);
receivedAgain = consumer2.receive();
Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
consumer2.acknowledge(receivedAgain);
producer.close();
consumer1.close();
consumer2.close();
}
@Test
public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
......
......@@ -66,7 +66,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
} catch (Exception e) {
} catch (Throwable e) {
if (consumer != null) {
log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", consumer.getTopic(), consumer.getConsumerName(), e);
} else {
......@@ -92,7 +92,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledge(consumer, messageId, exception);
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Error executing interceptor onAcknowledge callback ", e);
}
}
......@@ -113,7 +113,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Error executing interceptor onAcknowledgeCumulative callback ", e);
}
}
......@@ -134,7 +134,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Error executing interceptor onNegativeAcksSend callback", e);
}
}
......@@ -155,7 +155,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAckTimeoutSend(consumer, messageIds);
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Error executing interceptor onAckTimeoutSend callback", e);
}
}
......@@ -166,7 +166,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).close();
} catch (Exception e) {
} catch (Throwable e) {
log.error("Fail to close consumer interceptor ", e);
}
}
......
......@@ -63,7 +63,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage);
} catch (Exception e) {
} catch (Throwable e) {
if (producer != null) {
log.warn("Error executing interceptor beforeSend callback for topicName:{} ", producer.getTopic(), e);
} else {
......@@ -91,7 +91,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptors.get(i).onSendAcknowledgement(producer, message, msgId, exception);
} catch (Exception e) {
} catch (Throwable e) {
log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
}
}
......@@ -102,7 +102,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptors.get(i).close();
} catch (Exception e) {
} catch (Throwable e) {
log.error("Fail to close producer interceptor ", e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册