diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2f9146d910fb3487beb71245fe185162ce4d51a1..49e10eb1eb156d1e3191c4c754592ff7f7958a61 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -620,20 +620,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; - switch (e.getResponseCode()) { - case ResponseCode.TOPIC_NOT_EXIST: - case ResponseCode.SERVICE_NOT_AVAILABLE: - case ResponseCode.SYSTEM_ERROR: - case ResponseCode.NO_PERMISSION: - case ResponseCode.NO_BUYER_ID: - case ResponseCode.NOT_IN_CURRENT_UNIT: - continue; - default: - if (sendResult != null) { - return sendResult; - } + if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) { + continue; + } else { + if (sendResult != null) { + return sendResult; + } - throw e; + throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 24caf140544e26b94e40b45e66bfed2663a9a801..33d0ecc272cc49df8a37d96c5a9daaaaff0d7599 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -16,8 +16,11 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -38,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -62,6 +66,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; private final InternalLogger log = ClientLogger.getLog(); + private final Set retryResponseCodes = new CopyOnWriteArraySet(Arrays.asList( + ResponseCode.TOPIC_NOT_EXIST, + ResponseCode.SERVICE_NOT_AVAILABLE, + ResponseCode.SYSTEM_ERROR, + ResponseCode.NO_PERMISSION, + ResponseCode.NO_BUYER_ID, + ResponseCode.NOT_IN_CURRENT_UNIT + )); + /** * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * important when transactional messages are involved.

@@ -960,6 +973,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor); } + /** + * Add response code for retrying. + * + * @param responseCode response code, {@link ResponseCode} + */ + public void addRetryResponseCode(int responseCode) { + this.retryResponseCodes.add(responseCode); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { @@ -1090,4 +1112,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return traceDispatcher; } + public Set getRetryResponseCodes() { + return retryResponseCodes; + } }