From 121ed54516983dd0ee01b7aafb1f26b1867cd5cb Mon Sep 17 00:00:00 2001 From: Yuki Shiga Date: Thu, 15 Jun 2017 04:45:25 +0900 Subject: [PATCH] Internal Producer/Consumer in WebSocketProxy should be closed asynchronously (#473) --- .../com/yahoo/pulsar/websocket/ConsumerHandler.java | 11 +++++++++-- .../com/yahoo/pulsar/websocket/ProducerHandler.java | 9 ++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 785dcc75108..1c442e05673 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -135,7 +135,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler { @Override public void writeSuccess() { if (log.isDebugEnabled()) { - log.info("[{}/{}] message is delivered successfully to {} ", consumer.getTopic(), + log.debug("[{}/{}] message is delivered successfully to {} ", consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString()); } updateDeliverMsgStat(msgSize); @@ -184,7 +184,14 @@ public class ConsumerHandler extends AbstractWebSocketHandler { public void close() throws IOException { if (consumer != null) { this.service.removeConsumer(this); - consumer.close(); + consumer.closeAsync().thenAccept(x -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Closed consumer asynchronously", consumer.getTopic()); + } + }).exceptionally(exception -> { + log.warn("[{}] Failed to close consumer", consumer.getTopic(), exception); + return null; + }); } } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index 1053eb045f1..f7fc5cd3559 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -83,7 +83,14 @@ public class ProducerHandler extends AbstractWebSocketHandler { public void close() throws IOException { if (producer != null) { this.service.removeProducer(this); - producer.close(); + producer.closeAsync().thenAccept(x -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Closed producer asynchronously", producer.getTopic()); + } + }).exceptionally(exception -> { + log.warn("[{}] Failed to close producer", producer.getTopic(), exception); + return null; + }); } } -- GitLab