From 0363be174112b3d9b2478d9869d497bb764e6cde Mon Sep 17 00:00:00 2001
From: wz2cool
Date: Sun, 14 Mar 2021 13:00:50 +0800
Subject: [PATCH] add response code for retrying
---
.../impl/producer/DefaultMQProducerImpl.java | 20 ++++++---------
.../client/producer/DefaultMQProducer.java | 25 +++++++++++++++++++
2 files changed, 32 insertions(+), 13 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2f9146d9..49e10eb1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -620,20 +620,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
+ if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
+ continue;
+ } else {
+ if (sendResult != null) {
+ return sendResult;
+ }
- throw e;
+ throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 24caf140..33d0ecc2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -16,8 +16,11 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
@@ -38,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
@@ -62,6 +66,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog();
+ private final Set retryResponseCodes = new CopyOnWriteArraySet(Arrays.asList(
+ ResponseCode.TOPIC_NOT_EXIST,
+ ResponseCode.SERVICE_NOT_AVAILABLE,
+ ResponseCode.SYSTEM_ERROR,
+ ResponseCode.NO_PERMISSION,
+ ResponseCode.NO_BUYER_ID,
+ ResponseCode.NOT_IN_CURRENT_UNIT
+ ));
+
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
@@ -960,6 +973,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
}
+ /**
+ * Add response code for retrying.
+ *
+ * @param responseCode response code, {@link ResponseCode}
+ */
+ public void addRetryResponseCode(int responseCode) {
+ this.retryResponseCodes.add(responseCode);
+ }
+
private MessageBatch batch(Collection msgs) throws MQClientException {
MessageBatch msgBatch;
try {
@@ -1090,4 +1112,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return traceDispatcher;
}
+ public Set getRetryResponseCodes() {
+ return retryResponseCodes;
+ }
}
--
GitLab