diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index fca50cc565ceb398d66f5c1d28d98467396db221..6ca4b72403bf1f4c0f91e3b6498ab6c8a726b0e5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1367,16 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } @@ -1432,16 +1423,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } @@ -1498,21 +1480,25 @@ public class DefaultMQProducerImpl implements MQProducerInner { } }, null, timeout - cost); - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { - throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); - } else { - throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); - } - } - return responseMessage; + return waitResponse(msg, timeout, requestResponseFuture, cost); } finally { RequestFutureTable.getRequestFutureTable().remove(correlationId); } } + private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException { + Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); + if (responseMessage == null) { + if (requestResponseFuture.isSendRequestOk()) { + throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + } else { + throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); + } + } + return responseMessage; + } + public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis();