未验证 提交 1558e052 编写于 作者: X xu 提交者: GitHub

[ISSUE #1857] Refactor the duplicated code

close #1857 
上级 3974677f
...@@ -1367,16 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1367,16 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
}, timeout - cost); }, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); return waitResponse(msg, timeout, requestResponseFuture, cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally { } finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId); RequestFutureTable.getRequestFutureTable().remove(correlationId);
} }
...@@ -1432,16 +1423,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1432,16 +1423,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
}, timeout - cost); }, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); return waitResponse(msg, timeout, requestResponseFuture, cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally { } finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId); RequestFutureTable.getRequestFutureTable().remove(correlationId);
} }
...@@ -1498,21 +1480,25 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1498,21 +1480,25 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
}, null, timeout - cost); }, null, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); return waitResponse(msg, timeout, requestResponseFuture, cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally { } finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId); RequestFutureTable.getRequestFutureTable().remove(correlationId);
} }
} }
private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
}
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException { throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis(); long beginTimestamp = System.currentTimeMillis();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册