提交 ea47f7d3 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketMQTemplate优化(+SendResult)

上级 f6300c09
......@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
......@@ -22,17 +23,17 @@ public class RocketMQTemplate {
this.defaultMQProducer = producer;
}
public void send(String topic, String body) {
send(topic, "", body);
public SendResult send(String topic, String body) {
return send(topic, "", body);
}
public void send(String topic, String tags, String body) {
send(topic, tags, "", body);
public SendResult send(String topic, String tags, String body) {
return send(topic, tags, "", body);
}
public void send(String topic, String tags, String keys, String body) {
public SendResult send(String topic, String tags, String keys, String body) {
try {
send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
return send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
} catch (Exception e) {
log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
topic, tags, keys, body, e);
......@@ -40,18 +41,19 @@ public class RocketMQTemplate {
}
}
private void send(Message message) throws Exception {
this.defaultMQProducer.send(message);
log.info("send successful:{}", message);
private SendResult send(Message message) throws Exception {
SendResult sendResult = this.defaultMQProducer.send(message);
log.debug("send result: {}", sendResult);
return sendResult;
}
public void sendOrderly(String topic, String keys, String body) {
sendOrderly(topic, "", keys, body);
public SendResult sendOrderly(String topic, String keys, String body) {
return sendOrderly(topic, "", keys, body);
}
public void sendOrderly(String topic, String tags, String keys, String body) {
public SendResult sendOrderly(String topic, String tags, String keys, String body) {
try {
sendOrderly(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
return sendOrderly(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
} catch (Exception e) {
log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
topic, tags, keys, body, e);
......@@ -59,8 +61,8 @@ public class RocketMQTemplate {
}
}
private void sendOrderly(Message message) throws Exception {
this.defaultMQProducer.send(message,
private SendResult sendOrderly(Message message) throws Exception {
SendResult sendResult = this.defaultMQProducer.send(message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
......@@ -69,7 +71,8 @@ public class RocketMQTemplate {
return mqs.get(index);
}
}, message.getKeys());
log.info("send successful:{}", message);
log.debug("send result: {}", sendResult);
return sendResult;
}
}
package org.hongxi.whatsmars.rocketmq.spring.demo;
import org.apache.rocketmq.client.producer.SendResult;
import org.hongxi.whatsmars.rocketmq.config.spring.RocketMQTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
......@@ -10,7 +11,8 @@ public class Producer {
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) context.getBean("rocketMQTemplate");
for (int i = 0; i < 20; i++) {
try {
rocketMQTemplate.send("sdk-test", "rocketMQTemplate");
SendResult sendResult = rocketMQTemplate.send("sdk-test", "rocketMQTemplate");
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -5,6 +5,7 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -59,25 +60,25 @@ public class RocketMQTemplate {
return producerMap.get(producerKey);
}
public static void send(String topic, String body) {
send(DEFAULT_PRODUCER_GROUP, topic, body, DEFAULT_SEND_MSG_TIMEOUT);
public static SendResult send(String topic, String body) {
return send(DEFAULT_PRODUCER_GROUP, topic, body, DEFAULT_SEND_MSG_TIMEOUT);
}
public static void send(String topic, String body, int sendMsgTimeout) {
send(DEFAULT_PRODUCER_GROUP, topic, body, sendMsgTimeout);
public static SendResult send(String topic, String body, int sendMsgTimeout) {
return send(DEFAULT_PRODUCER_GROUP, topic, body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String body, int sendMsgTimeout) {
send(producerGroup, topic, "", body, sendMsgTimeout);
public static SendResult send(String producerGroup, String topic, String body, int sendMsgTimeout) {
return send(producerGroup, topic, "", body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String tags, String body, int sendMsgTimeout) {
send(producerGroup, topic, tags, "", body, sendMsgTimeout);
public static SendResult send(String producerGroup, String topic, String tags, String body, int sendMsgTimeout) {
return send(producerGroup, topic, tags, "", body, sendMsgTimeout);
}
public static void send(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
public static SendResult send(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
try {
send(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
return send(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
......@@ -85,21 +86,21 @@ public class RocketMQTemplate {
}
}
private static void send(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
getProducer(producerGroup, sendMsgTimeout).send(message);
private static SendResult send(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
return getProducer(producerGroup, sendMsgTimeout).send(message);
}
public static void sendOrderly(String producerGroup, String topic, String keys, String body) {
sendOrderly(producerGroup, topic, keys, body, DEFAULT_SEND_MSG_TIMEOUT);
public static SendResult sendOrderly(String producerGroup, String topic, String keys, String body) {
return sendOrderly(producerGroup, topic, keys, body, DEFAULT_SEND_MSG_TIMEOUT);
}
public static void sendOrderly(String producerGroup, String topic, String keys, String body, int sendMsgTimeout) {
sendOrderly(producerGroup, topic, keys, body, sendMsgTimeout);
public static SendResult sendOrderly(String producerGroup, String topic, String keys, String body, int sendMsgTimeout) {
return sendOrderly(producerGroup, topic, keys, body, sendMsgTimeout);
}
public static void sendOrderly(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
public static SendResult sendOrderly(String producerGroup, String topic, String tags, String keys, String body, int sendMsgTimeout) {
try {
sendOrderly(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
return sendOrderly(producerGroup, new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)), sendMsgTimeout);
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
......@@ -107,8 +108,8 @@ public class RocketMQTemplate {
}
}
private static void sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
getProducer(producerGroup, sendMsgTimeout).send(message,
private static SendResult sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
return getProducer(producerGroup, sendMsgTimeout).send(message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
......
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
/**
......@@ -16,13 +17,13 @@ public class RocketMQTest {
int count = 0;
for (int i = 0; i < 20; i++) {
try {
RocketMQTemplate.send(TEST_TOPIC, "hello" + i, 1000);
SendResult sendResult = RocketMQTemplate.send(TEST_TOPIC, "hello" + i, 1000);
System.out.println(sendResult);
count++;
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("send successful: " + count);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册