未验证 提交 67ccb7f7 编写于 作者: youlixishia's avatar youlixishia 提交者: GitHub

Merge pull request #2729 from wz2cool/master

[ISSUE #2726] retrying by customizing response code
...@@ -82,7 +82,6 @@ import org.apache.rocketmq.common.message.MessageId; ...@@ -82,7 +82,6 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
...@@ -643,20 +642,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -643,20 +642,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString()); log.warn(msg.toString());
exception = e; exception = e;
switch (e.getResponseCode()) { if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
case ResponseCode.TOPIC_NOT_EXIST: continue;
case ResponseCode.SERVICE_NOT_AVAILABLE: } else {
case ResponseCode.SYSTEM_ERROR: if (sendResult != null) {
case ResponseCode.NO_PERMISSION: return sendResult;
case ResponseCode.NO_BUYER_ID: }
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e; throw e;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
......
...@@ -16,8 +16,11 @@ ...@@ -16,8 +16,11 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
...@@ -39,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; ...@@ -39,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
...@@ -63,6 +67,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -63,6 +67,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/ */
protected final transient DefaultMQProducerImpl defaultMQProducerImpl; protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));
/** /**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved. </p> * important when transactional messages are involved. </p>
...@@ -958,6 +971,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -958,6 +971,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor); this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
} }
/**
* Add response code for retrying.
*
* @param responseCode response code, {@link ResponseCode}
*/
public void addRetryResponseCode(int responseCode) {
this.retryResponseCodes.add(responseCode);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException { private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch; MessageBatch msgBatch;
try { try {
...@@ -1088,4 +1110,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -1088,4 +1110,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return traceDispatcher; return traceDispatcher;
} }
public Set<Integer> getRetryResponseCodes() {
return retryResponseCodes;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册