提交 f5ba9dab 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketTemplate优化

上级 d3da494b
......@@ -20,6 +20,10 @@ public abstract class BaseConsumer {
protected DefaultMQPushConsumer consumer;
protected void startConsume(String consumerGroup, String topic) throws MQClientException {
startConsume(consumerGroup, topic, "*");
}
protected void startConsume(String consumerGroup, String topic, String tags) throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
......
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
/**
* Created by shenhongxi on 2018/12/12.
*/
public class MessagingException extends RuntimeException {
private static final long serialVersionUID = -5758410930844185841L;
private int responseCode;
private String errorMessage;
public MessagingException(String errorMessage, Throwable cause) {
super(FAQUrl.attachDefaultURL(errorMessage), cause);
this.responseCode = -1;
this.errorMessage = errorMessage;
}
public MessagingException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ errorMessage));
this.responseCode = responseCode;
this.errorMessage = errorMessage;
}
public int getResponseCode() {
return responseCode;
}
public MessagingException setResponseCode(final int responseCode) {
this.responseCode = responseCode;
return this;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(final String errorMessage) {
this.errorMessage = errorMessage;
}
}
......@@ -2,7 +2,6 @@ package org.hongxi.whatsmars.common.rocketmq;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
......@@ -18,7 +17,7 @@ import java.util.Map;
/**
* Created by shenhongxi on 2018/12/11.
*/
public class RocketMQTemplate {
public class RocketTemplate {
private static final InternalLogger log = ClientLogger.getLog();
......@@ -26,19 +25,19 @@ public class RocketMQTemplate {
private static Map<String, DefaultMQProducer> producerMap = new HashMap<>();
public static DefaultMQProducer getProducer() throws MQClientException {
public static DefaultMQProducer getProducer() throws Exception {
return getProducer(4, DEFAULT_PRODUCER_GROUP);
}
public static DefaultMQProducer getProducer(int queueNum) throws MQClientException {
public static DefaultMQProducer getProducer(int queueNum) throws Exception {
return getProducer(queueNum, DEFAULT_PRODUCER_GROUP);
}
public static DefaultMQProducer getProducer(String producerGroup) throws MQClientException {
public static DefaultMQProducer getProducer(String producerGroup) throws Exception {
return getProducer(4, producerGroup);
}
public static DefaultMQProducer getProducer(int queueNum, String producerGroup) throws MQClientException {
public static DefaultMQProducer getProducer(int queueNum, String producerGroup) throws Exception {
if (queueNum < 1) throw new IllegalArgumentException("queueNum must >= 1");
if (StringUtils.isBlank(producerGroup)) throw new IllegalArgumentException("producerGroup cannot be null");
String producerKey = producerGroup + queueNum;
......@@ -55,12 +54,31 @@ public class RocketMQTemplate {
return producerMap.get(producerKey);
}
public static void send(String topic, String body) {
send(DEFAULT_PRODUCER_GROUP, topic, body);
}
public static void send(String producerGroup, String topic, String body) {
try {
getProducer(producerGroup).send(new Message(topic, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, body:{}",
producerGroup, topic, body, e);
throw new MessagingException(e.getMessage(), e);
}
}
public static void send(String topic, String tags, String keys, String body) {
send(DEFAULT_PRODUCER_GROUP, topic, tags, keys, body);
}
public static void send(String producerGroup, String topic, String tags, String keys, String body) {
try {
getProducer(producerGroup).send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
throw new MessagingException(e.getMessage(), e);
}
}
......@@ -78,6 +96,7 @@ public class RocketMQTemplate {
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
throw new MessagingException(e.getMessage(), e);
}
}
}
\ No newline at end of file
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.rocketmq.common.MixAll;
import org.junit.Before;
import org.junit.Test;
/**
* Created by shenhongxi on 2018/12/12.
*/
public class RocketMQTest {
private static final String TEST_TOPIC = "sdk-test";
@Before
public void namesrv() {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
}
@Test
public void send() {
int count = 0;
for (int i = 0; i < 20; i++) {
try {
RocketTemplate.send(TEST_TOPIC, "hello" + i);
count++;
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("send successful: " + count);
}
@Test
public void consume() throws Exception {
TestConsumer testConsumer = new TestConsumer();
testConsumer.startConsume("test-consumer", TEST_TOPIC);
System.out.println("test-consumer started");
Thread.sleep(30000);
}
}
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.rocketmq.common.message.MessageExt;
/**
* Created by shenhongxi on 2018/12/12.
*/
public class TestConsumer extends BaseConsumer {
@Override
protected void process(MessageExt messageExt) {
System.out.println(new String(messageExt.getBody()));
}
}
......@@ -5,7 +5,7 @@ import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
import org.hongxi.whatsmars.common.rocketmq.RocketMQTemplate;
import org.hongxi.whatsmars.common.rocketmq.RocketTemplate;
import org.hongxi.whatsmars.otter.extend.support.Column;
import org.hongxi.whatsmars.otter.extend.support.OtterData;
import org.springframework.util.StringUtils;
......@@ -77,7 +77,7 @@ public class MQEventProcessor extends AbstractEventProcessor {
tags = tags.substring(0, tags.length() - suffix.length());
}
RocketMQTemplate.sendOrderly("otter-producer", topic, tags, msgKey, JSON.toJSONString(otterData));
RocketTemplate.sendOrderly("otter-producer", topic, tags, msgKey, JSON.toJSONString(otterData));
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册