提交 fb3a8bc0 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketMQMessageListener增加reconsumeTimes属性,用来控制消费失败重试次数

上级 c1a15d1b
......@@ -176,6 +176,7 @@ public class RocketMQAutoConfiguration {
beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
beanBuilder.addPropertyValue(PROP_RECONSUME_TIMES, annotation.reconsumeTimes());
beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
......
......@@ -80,4 +80,9 @@ public @interface RocketMQMessageListener {
*/
int consumeThreadMax() default 64;
/**
* Re-consume times for concurrent/orderly message
*/
int reconsumeTimes() default 0;
}
......@@ -96,6 +96,10 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
@Getter
private int consumeThreadMax = 64;
@Setter
@Getter
private int reconsumeTimes = 0;
@Getter
@Setter
private String charset = "UTF-8";
......@@ -180,9 +184,14 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
if (reconsumeTimes == 0) {
log.warn("consume message failed, and will not retry. messageExt:{}", messageExt, e);
} else if (reconsumeTimes <= -1 || messageExt.getReconsumeTimes() < reconsumeTimes) {
log.warn("consume message failed, reconsumeTimes:{}. messageExt:{}",
messageExt.getReconsumeTimes(), messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
......@@ -273,6 +282,9 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
}
consumer.setMessageModel(messageModel);
if (reconsumeTimes > 0) {
consumer.setMaxReconsumeTimes(reconsumeTimes);
}
switch (selectorType) {
case TAG:
......
......@@ -27,6 +27,7 @@ public final class DefaultRocketMQListenerContainerConstants {
public static final String PROP_CONSUMER_GROUP = "consumerGroup";
public static final String PROP_CONSUME_MODE = "consumeMode";
public static final String PROP_CONSUME_THREAD_MAX = "consumeThreadMax";
public static final String PROP_RECONSUME_TIMES = "reconsumeTimes";
public static final String PROP_MESSAGE_MODEL = "messageModel";
public static final String PROP_SELECTOR_EXPRESS = "selectorExpress";
public static final String PROP_SELECTOR_TYPE = "selectorType";
......
package org.hongxi.whatsmars.rocketmq.boot.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-4", consumerGroup = "my-consumer_test-topic-4",
consumeMode = ConsumeMode.ORDERLY)
public class MyConsumer4 implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("received message: " + message);
int a = 1 / 0;
}
}
\ No newline at end of file
package org.hongxi.whatsmars.rocketmq.boot.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-4", consumerGroup = "my-consumer_test-topic-5",
consumeMode = ConsumeMode.ORDERLY, reconsumeTimes = 3)
public class MyConsumer5 implements RocketMQListener<MessageExt> {
public void onMessage(MessageExt messageExt) {
log.info("received message: " + messageExt);
int a = 1 / 0;
}
}
\ No newline at end of file
......@@ -21,16 +21,18 @@ public class ProducerApplication implements CommandLineRunner {
}
public void run(String... args) throws Exception {
for (int i = 0; i < 20; i++) {
for (int i = 0; i < 5; i++) {
try {
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("I'm delayed message").build(), MessageDelayLevel.TIME_1M);
} catch (Exception e) {
e.printStackTrace();
}
}
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("I'm delayed message").build(), MessageDelayLevel.TIME_1M);
rocketMQTemplate.syncSendOrderly("test-topic-4", MessageBuilder.withPayload("I'm order message").build(), "1234");
System.out.println("send finished!");
// rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册