diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java index c42d22cd60e3ed31fb6b7858d30fd2053bd3f79e..4474b9b391ad4a9d7ca2894036acec80b99a08be 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java @@ -175,16 +175,40 @@ public class BatchSourceExecutor implements Source { } private void stop() throws Exception { + Exception ex = null; if (discoveryTriggerer != null) { - discoveryTriggerer.stop(); + try { + discoveryTriggerer.stop(); + } catch (Exception e) { + log.error("Encountered exception when closing Batch Source Triggerer", e); + ex = e; + } discoveryTriggerer = null; } if (intermediateTopicConsumer != null) { - intermediateTopicConsumer.close(); + try { + intermediateTopicConsumer.close(); + } catch (Exception e) { + log.error("Encountered exception when closing intermediate topic of Batch Source", e); + if (ex != null) { + ex = e; + } + } intermediateTopicConsumer = null; } if (batchSource != null) { - batchSource.close(); + try { + batchSource.close(); + } catch (Exception e) { + log.error("Encountered exception when closing Batch Source", e); + if (ex != null) { + ex = e; + } + } + } + + if (ex != null) { + throw ex; } }