提交 ebc0ede8 编写于 作者: Q qqeasonchen

optimization exception declare of request

上级 a8c9fe68
...@@ -1377,32 +1377,28 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1377,32 +1377,28 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
} }
public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException { public void request(Message msg, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis(); long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp; long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true); requestResponseFuture.setSendReqeustOk(true);
} }
@Override @Override
public void onException(Throwable e) { public void onException(Throwable e) {
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
requestFail(requestUniqId); requestFail(requestUniqId);
} }
}, timeout - cost); }, timeout - cost);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
} }
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
...@@ -1447,32 +1443,29 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1447,32 +1443,29 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public void request(final Message msg, final MessageQueueSelector selector, final Object arg, public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws RemotingException { final RequestCallback requestCallback, final long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis(); long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp; long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true); requestResponseFuture.setSendReqeustOk(true);
} }
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
}
}, timeout - cost);
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
}
}, timeout - cost);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
} }
public Message request(final Message msg, final MessageQueue mq, final long timeout) public Message request(final Message msg, final MessageQueue mq, final long timeout)
...@@ -1516,32 +1509,27 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1516,32 +1509,27 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException { throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis(); long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try { final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp; long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true); requestResponseFuture.setSendReqeustOk(true);
} }
@Override @Override
public void onException(Throwable e) { public void onException(Throwable e) {
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
requestFail(requestUniqId); requestFail(requestUniqId);
} }
}, null, timeout - cost); }, null, timeout - cost);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
} }
private void requestFail(final String requestUniqId) { private void requestFail(final String requestUniqId) {
......
...@@ -579,6 +579,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -579,6 +579,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error. * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/ */
@Override @Override
public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
...@@ -601,10 +602,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -601,10 +602,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/ */
@Override @Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout) public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic())); msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
} }
...@@ -621,6 +623,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -621,6 +623,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error. * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/ */
@Override @Override
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
...@@ -641,11 +644,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -641,11 +644,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/ */
@Override @Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg, public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException { InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic())); msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
} }
...@@ -660,6 +664,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -660,6 +664,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error. * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/ */
@Override @Override
public Message request(final Message msg, final MessageQueue mq, final long timeout) public Message request(final Message msg, final MessageQueue mq, final long timeout)
...@@ -678,10 +683,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -678,10 +683,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error. * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted. * @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/ */
@Override @Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic())); msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
} }
......
...@@ -105,7 +105,7 @@ public interface MQProducer extends MQAdmin { ...@@ -105,7 +105,7 @@ public interface MQProducer extends MQAdmin {
RemotingException, MQBrokerException, InterruptedException; RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout) void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException; throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg, Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
...@@ -113,12 +113,12 @@ public interface MQProducer extends MQAdmin { ...@@ -113,12 +113,12 @@ public interface MQProducer extends MQAdmin {
void request(final Message msg, final MessageQueueSelector selector, final Object arg, void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final RequestCallback requestCallback,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, final long timeout) throws MQClientException, RemotingException,
InterruptedException; InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout) Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException; throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册