diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index c079d6590d9cad410e15523d43dd8de7fdb5025d..3192981d4ea2a11504027bcba2772dba979f0c93 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -145,6 +145,107 @@ public class InterceptorsTest extends ProducerConsumerBase { producer.close(); } + @Test + public void testProducerInterceptorsWithErrors() throws PulsarClientException { + ProducerInterceptor interceptor = new ProducerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeSend(Producer producer, Message message) { + throw new AbstractMethodError(); + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) { + throw new AbstractMethodError(); + } + }; + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .intercept(interceptor) + .create(); + + MessageId messageId = producer.newMessage().value("Hello Pulsar!").send(); + Assert.assertNotNull(messageId); + producer.close(); + } + + @Test + public void testConsumerInterceptorWithErrors() throws PulsarClientException { + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() { + throw new AbstractMethodError(); + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + throw new AbstractMethodError(); + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + throw new AbstractMethodError(); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable exception) { + throw new AbstractMethodError(); + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + throw new AbstractMethodError(); + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + throw new AbstractMethodError(); + } + }; + Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic-exception") + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription-ack-timeout") + .ackTimeout(3, TimeUnit.SECONDS) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic-exception") + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription-negative") + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic-exception") + .create(); + + producer.newMessage().value("Hello Pulsar!").send(); + + Message received = consumer1.receive(); + Assert.assertEquals(received.getValue(), "Hello Pulsar!"); + // wait ack timeout + Message receivedAgain = consumer1.receive(); + Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!"); + consumer1.acknowledge(receivedAgain); + + received = consumer2.receive(); + Assert.assertEquals(received.getValue(), "Hello Pulsar!"); + consumer2.negativeAcknowledge(received); + receivedAgain = consumer2.receive(); + Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!"); + consumer2.acknowledge(receivedAgain); + + producer.close(); + consumer1.close(); + consumer2.close(); + } + @Test public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException { ConsumerInterceptor interceptor = new ConsumerInterceptor() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java index e5bf834cd179f75d5d4944b93408d624910bf113..91b322826b9dd3a22cd129f65471407e3a0c7f76 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java @@ -66,7 +66,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage); - } catch (Exception e) { + } catch (Throwable e) { if (consumer != null) { log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", consumer.getTopic(), consumer.getConsumerName(), e); } else { @@ -92,7 +92,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptors.get(i).onAcknowledge(consumer, messageId, exception); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Error executing interceptor onAcknowledge callback ", e); } } @@ -113,7 +113,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Error executing interceptor onAcknowledgeCumulative callback ", e); } } @@ -134,7 +134,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptors.get(i).onNegativeAcksSend(consumer, messageIds); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Error executing interceptor onNegativeAcksSend callback", e); } } @@ -155,7 +155,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptors.get(i).onAckTimeoutSend(consumer, messageIds); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Error executing interceptor onAckTimeoutSend callback", e); } } @@ -166,7 +166,7 @@ public class ConsumerInterceptors implements Closeable { for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { try { interceptors.get(i).close(); - } catch (Exception e) { + } catch (Throwable e) { log.error("Fail to close consumer interceptor ", e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java index c70e4a3aa38361c9b4be2da6697f0748159db24b..68fa54b705d6cd769a61840ba71e65f99d9c1725 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java @@ -63,7 +63,7 @@ public class ProducerInterceptors implements Closeable { for (int i = 0; i < interceptors.size(); i++) { try { interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage); - } catch (Exception e) { + } catch (Throwable e) { if (producer != null) { log.warn("Error executing interceptor beforeSend callback for topicName:{} ", producer.getTopic(), e); } else { @@ -91,7 +91,7 @@ public class ProducerInterceptors implements Closeable { for (int i = 0; i < interceptors.size(); i++) { try { interceptors.get(i).onSendAcknowledgement(producer, message, msgId, exception); - } catch (Exception e) { + } catch (Throwable e) { log.warn("Error executing interceptor onSendAcknowledgement callback ", e); } } @@ -102,7 +102,7 @@ public class ProducerInterceptors implements Closeable { for (int i = 0; i < interceptors.size(); i++) { try { interceptors.get(i).close(); - } catch (Exception e) { + } catch (Throwable e) { log.error("Fail to close producer interceptor ", e); } }