提交 0363be17 编写于 作者: wz3cool's avatar wz3cool

add response code for retrying

上级 3ae25175
......@@ -620,15 +620,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
default:
} else {
if (sendResult != null) {
return sendResult;
}
......
......@@ -16,8 +16,11 @@
*/
package org.apache.rocketmq.client.producer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
......@@ -38,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -62,6 +66,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog();
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved. </p>
......@@ -960,6 +973,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
}
/**
* Add response code for retrying.
*
* @param responseCode response code, {@link ResponseCode}
*/
public void addRetryResponseCode(int responseCode) {
this.retryResponseCodes.add(responseCode);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
......@@ -1090,4 +1112,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return traceDispatcher;
}
public Set<Integer> getRetryResponseCodes() {
return retryResponseCodes;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册