diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index e740c834b44562cb2e5a6111912cd57315ccd8ff..c2736bc0cb54caa7bf3b73b7b6f216f43f3cc754 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1377,32 +1377,28 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException { + public void request(Message msg, final RequestCallback requestCallback, long timeout) + throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); - try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); - long cost = System.currentTimeMillis() - beginTimestamp; - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendReqeustOk(true); - } + long cost = System.currentTimeMillis() - beginTimestamp; + this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } - @Override - public void onException(Throwable e) { - requestResponseFuture.setCause(e); - requestFail(requestUniqId); - } - }, timeout - cost); - } catch (Exception ex) { - log.warn("send request message to <{}> failed.", msg.getTopic(), ex); - throw new RemotingSendRequestException(msg.getTopic(), ex); - } + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, timeout - cost); } public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, @@ -1447,32 +1443,29 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void request(final Message msg, final MessageQueueSelector selector, final Object arg, - final RequestCallback requestCallback, final long timeout) throws RemotingException { + final RequestCallback requestCallback, final long timeout) + throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); - try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); - long cost = System.currentTimeMillis() - beginTimestamp; - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendReqeustOk(true); - } + long cost = System.currentTimeMillis() - beginTimestamp; + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, timeout - cost); - @Override - public void onException(Throwable e) { - requestResponseFuture.setCause(e); - requestFail(requestUniqId); - } - }, timeout - cost); - } catch (Exception ex) { - log.warn("send request message to <{}> failed.", msg.getTopic(), ex); - throw new RemotingSendRequestException(msg.getTopic(), ex); - } } public Message request(final Message msg, final MessageQueue mq, final long timeout) @@ -1516,32 +1509,27 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws RemotingException { + throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); - try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); - long cost = System.currentTimeMillis() - beginTimestamp; - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendReqeustOk(true); - } + long cost = System.currentTimeMillis() - beginTimestamp; + this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } - @Override - public void onException(Throwable e) { - requestResponseFuture.setCause(e); - requestFail(requestUniqId); - } - }, null, timeout - cost); - } catch (Exception ex) { - log.warn("send request message to <{}> failed.", msg.getTopic(), ex); - throw new RemotingSendRequestException(msg.getTopic(), ex); - } + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, null, timeout - cost); } private void requestFail(final String requestUniqId) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index fdb489eebd6b11d1eff5f6f3220c839cfc39ca81..faa79f54c93924eb5f6841698c7ad53e98cd670b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -579,6 +579,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws RemotingException if there is any network-tier error. * @throws MQBrokerException if there is any broker error. * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. */ @Override public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, @@ -601,10 +602,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final RequestCallback requestCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { + throws MQClientException, RemotingException, InterruptedException, MQBrokerException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.request(msg, requestCallback, timeout); } @@ -621,6 +623,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws RemotingException if there is any network-tier error. * @throws MQBrokerException if there is any broker error. * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. */ @Override public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, @@ -641,11 +644,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final MessageQueueSelector selector, final Object arg, final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, - InterruptedException { + InterruptedException, MQBrokerException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout); } @@ -660,6 +664,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws RemotingException if there is any network-tier error. * @throws MQBrokerException if there is any broker error. * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. */ @Override public Message request(final Message msg, final MessageQueue mq, final long timeout) @@ -678,10 +683,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException, MQBrokerException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index a837c3d232d6c1eeb5f2e79819ac12c56de32fbb..c6cf4c93596578e958de17815fa9604e2c09b7fe 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -105,7 +105,7 @@ public interface MQProducer extends MQAdmin { RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final RequestCallback requestCallback, final long timeout) - throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException; + throws MQClientException, RemotingException, InterruptedException, MQBrokerException; Message request(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, @@ -113,12 +113,12 @@ public interface MQProducer extends MQAdmin { void request(final Message msg, final MessageQueueSelector selector, final Object arg, final RequestCallback requestCallback, - final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, - InterruptedException; + final long timeout) throws MQClientException, RemotingException, + InterruptedException, MQBrokerException; Message request(final Message msg, final MessageQueue mq, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) - throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException; + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; }