From 1558e052018ae12d273e175dd3c6ba004e9e0995 Mon Sep 17 00:00:00 2001 From: xu <774590465@qq.com> Date: Mon, 16 Mar 2020 20:05:32 +0800 Subject: [PATCH] [ISSUE #1857] Refactor the duplicated code close #1857 --- .../impl/producer/DefaultMQProducerImpl.java | 46 +++++++------------ 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index fca50cc5..6ca4b724 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1367,16 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } @@ -1432,16 +1423,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } @@ -1498,21 +1480,25 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, null, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } } + private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException { + Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); + if (responseMessage == null) { + if (requestResponseFuture.isSendRequestOk()) { + throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + } else { + throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); + } + } + return responseMessage; + } + public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); -- GitLab