From bd4a83016d56f8afbbee6e5a84bb05c6c994ce68 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Sat, 5 Sep 2020 08:13:18 -0700 Subject: [PATCH] Implement better error handling during close() of a batch source (#7984) Co-authored-by: Jerry Peng --- .../source/batch/BatchSourceExecutor.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java index c42d22cd60e..4474b9b391a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java @@ -175,16 +175,40 @@ public class BatchSourceExecutor implements Source { } private void stop() throws Exception { + Exception ex = null; if (discoveryTriggerer != null) { - discoveryTriggerer.stop(); + try { + discoveryTriggerer.stop(); + } catch (Exception e) { + log.error("Encountered exception when closing Batch Source Triggerer", e); + ex = e; + } discoveryTriggerer = null; } if (intermediateTopicConsumer != null) { - intermediateTopicConsumer.close(); + try { + intermediateTopicConsumer.close(); + } catch (Exception e) { + log.error("Encountered exception when closing intermediate topic of Batch Source", e); + if (ex != null) { + ex = e; + } + } intermediateTopicConsumer = null; } if (batchSource != null) { - batchSource.close(); + try { + batchSource.close(); + } catch (Exception e) { + log.error("Encountered exception when closing Batch Source", e); + if (ex != null) { + ex = e; + } + } + } + + if (ex != null) { + throw ex; } } -- GitLab