提交 f6300c09 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketMQTemplate优化

上级 900f58ce
......@@ -21,23 +21,27 @@ public class RocketMQTemplate {
private static final InternalLogger log = ClientLogger.getLog();
private static final String DEFAULT_PRODUCER_GROUP= "common-producer";
private static final String DEFAULT_PRODUCER_GROUP= "default-producer";
private static final int DEFAULT_QUEUE_NUM = 4;
private static final int DEFAULT_SEND_MSG_TIMEOUT = 3000;
private static Map<String, DefaultMQProducer> producerMap = new HashMap<>();
public static DefaultMQProducer getProducer() throws Exception {
return getProducer(4, DEFAULT_PRODUCER_GROUP);
return getProducer(DEFAULT_QUEUE_NUM, DEFAULT_PRODUCER_GROUP, DEFAULT_SEND_MSG_TIMEOUT);
}
public static DefaultMQProducer getProducer(int queueNum) throws Exception {
return getProducer(queueNum, DEFAULT_PRODUCER_GROUP);
return getProducer(queueNum, DEFAULT_PRODUCER_GROUP, DEFAULT_SEND_MSG_TIMEOUT);
}
public static DefaultMQProducer getProducer(String producerGroup) throws Exception {
return getProducer(4, producerGroup);
public static DefaultMQProducer getProducer(String producerGroup, int sendMsgTimeout) throws Exception {
return getProducer(DEFAULT_QUEUE_NUM, producerGroup, sendMsgTimeout);
}
public static DefaultMQProducer getProducer(int queueNum, String producerGroup) throws Exception {
public static DefaultMQProducer getProducer(int queueNum, String producerGroup, int sendMsgTimeout) throws Exception {
if (queueNum < 1) throw new IllegalArgumentException("queueNum must >= 1");
if (StringUtils.isBlank(producerGroup)) throw new IllegalArgumentException("producerGroup cannot be null");
String producerKey = producerGroup + queueNum;
......@@ -46,6 +50,7 @@ public class RocketMQTemplate {
if (producerMap.get(producerKey) == null) {
DefaultMQProducer producer = new DefaultMQProducer(DEFAULT_PRODUCER_GROUP);
producer.setDefaultTopicQueueNums(queueNum);
producer.setSendMsgTimeout(sendMsgTimeout);
producer.start();
producerMap.put(producerKey, producer);
}
......@@ -55,20 +60,24 @@ public class RocketMQTemplate {
}
public static void send(String topic, String body) {
send(DEFAULT_PRODUCER_GROUP, topic, body);
send(DEFAULT_PRODUCER_GROUP, topic, body, DEFAULT_SEND_MSG_TIMEOUT);
}
public static void send(String topic, String body, int sendMsgTimeout) {
send(DEFAULT_PRODUCER_GROUP, topic, body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String body) {
send(producerGroup, topic, "", body);
public static void send(String producerGroup, String topic, String body, int sendMsgTimeout) {
send(producerGroup, topic, "", body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String tags, String body) {
send(producerGroup, topic, tags, "", body);
public static void send(String producerGroup, String topic, String tags, String body, int sendMsgTimeout) {
send(producerGroup, topic, tags, "", body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String tags, String keys, String body) {
public static void send(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
try {
send(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
send(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
......@@ -76,17 +85,21 @@ public class RocketMQTemplate {
}
}
private static void send(String producerGroup, Message message) throws Exception {
getProducer(producerGroup).send(message);
private static void send(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
getProducer(producerGroup, sendMsgTimeout).send(message);
}
public static void sendOrderly(String producerGroup, String topic, String keys, String body) {
sendOrderly(producerGroup, topic, keys, body);
sendOrderly(producerGroup, topic, keys, body, DEFAULT_SEND_MSG_TIMEOUT);
}
public static void sendOrderly(String producerGroup, String topic, String keys, String body, int sendMsgTimeout) {
sendOrderly(producerGroup, topic, keys, body, sendMsgTimeout);
}
public static void sendOrderly(String producerGroup, String topic, String tags, String keys, String body) {
public static void sendOrderly(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
try {
sendOrderly(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
sendOrderly(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
......@@ -94,8 +107,8 @@ public class RocketMQTemplate {
}
}
private static void sendOrderly(String producerGroup, Message message) throws Exception {
getProducer(producerGroup).send(message,
private static void sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
getProducer(producerGroup, sendMsgTimeout).send(message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
......
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.rocketmq.common.MixAll;
import org.junit.Before;
import org.junit.Test;
/**
......@@ -11,17 +9,14 @@ public class RocketMQTest {
private static final String TEST_TOPIC = "sdk-test";
@Before
public void namesrv() {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
}
// -Drocketmq.namesrv.addr=127.0.0.1:9876
@Test
public void send() {
int count = 0;
for (int i = 0; i < 20; i++) {
try {
RocketMQTemplate.send(TEST_TOPIC, "hello" + i);
RocketMQTemplate.send(TEST_TOPIC, "hello" + i, 1000);
count++;
} catch (Exception e) {
e.printStackTrace();
......
......@@ -77,7 +77,7 @@ public class MQEventProcessor extends AbstractEventProcessor {
tags = tags.substring(0, tags.length() - suffix.length());
}
RocketMQTemplate.sendOrderly("otter-producer", topic, tags, msgKey, JSON.toJSONString(otterData));
RocketMQTemplate.sendOrderly("otter-producer", topic, tags, msgKey, JSON.toJSONString(otterData), 3000);
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册