diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 00ee3b0a67f562e8527311a834e97a5c4605362a..bdc103f13280f78e44fabc5abd980cdcf3247ce0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -82,7 +82,6 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; @@ -643,20 +642,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; - switch (e.getResponseCode()) { - case ResponseCode.TOPIC_NOT_EXIST: - case ResponseCode.SERVICE_NOT_AVAILABLE: - case ResponseCode.SYSTEM_ERROR: - case ResponseCode.NO_PERMISSION: - case ResponseCode.NO_BUYER_ID: - case ResponseCode.NOT_IN_CURRENT_UNIT: - continue; - default: - if (sendResult != null) { - return sendResult; - } + if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) { + continue; + } else { + if (sendResult != null) { + return sendResult; + } - throw e; + throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 50ca82b1b7bf99a2ec24c23e4c08eeae81cb1158..1af416b48c7a5d87c4b54de76617d29783842b0b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -16,8 +16,11 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -39,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -63,6 +67,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; private final InternalLogger log = ClientLogger.getLog(); + private final Set retryResponseCodes = new CopyOnWriteArraySet(Arrays.asList( + ResponseCode.TOPIC_NOT_EXIST, + ResponseCode.SERVICE_NOT_AVAILABLE, + ResponseCode.SYSTEM_ERROR, + ResponseCode.NO_PERMISSION, + ResponseCode.NO_BUYER_ID, + ResponseCode.NOT_IN_CURRENT_UNIT + )); + /** * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * important when transactional messages are involved.

@@ -958,6 +971,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor); } + /** + * Add response code for retrying. + * + * @param responseCode response code, {@link ResponseCode} + */ + public void addRetryResponseCode(int responseCode) { + this.retryResponseCodes.add(responseCode); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { @@ -1088,4 +1110,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return traceDispatcher; } + public Set getRetryResponseCodes() { + return retryResponseCodes; + } }