提交 02198296 编写于 作者: M Matteo Merli 提交者: GitHub

Fixed race condition with NPE on Producer.closeAsync() (#490)

上级 d9c92e72
......@@ -406,7 +406,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
stats.cancelStatsTimeout();
if (getClientCnx() == null || currentState != State.Ready) {
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName);
synchronized (this) {
setState(State.Closed);
......@@ -428,7 +429,6 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
if (exception == null || !cnx.ctx().channel().isActive()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册