提交 aaf116d8 编写于 作者: 武汉红喜's avatar 武汉红喜

rocketmq spring

上级 cc2968ac
package org.hongxi.whatsmars.mq.rocketmq.spring;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
public class Consumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("spring/rocketmq-consumer.xml");
}
}
package org.hongxi.whatsmars.mq.rocketmq.spring;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
public class ConsumerFactoryBean implements FactoryBean<DefaultMQPushConsumer>,InitializingBean,DisposableBean {
private DefaultMQPushConsumer consumer;
private String consumerGroup;
private String namesrvAddr;
private String topic;
private String tags;
private MessageListenerConcurrently messageListener;
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setTags(String tags) {
this.tags = tags;
}
public void setMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
}
@Override
public DefaultMQPushConsumer getObject() throws Exception {
return consumer;
}
@Override
public Class<?> getObjectType() {
return DefaultMQPushConsumer.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, tags);
consumer.registerMessageListener(messageListener);
consumer.start();
}
@Override
public void destroy() throws Exception {
consumer.shutdown();
}
}
package org.hongxi.whatsmars.mq.rocketmq.spring;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DemoMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + messages + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package org.hongxi.whatsmars.mq.rocketmq.spring;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
public class Producer {
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/rocketmq-producer.xml");
DefaultMQProducer producer = (DefaultMQProducer) context.getBean("defaultMQProducer");
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
}
}
package org.hongxi.whatsmars.mq.rocketmq.spring;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
public class ProducerFactoryBean implements FactoryBean<DefaultMQProducer>,InitializingBean,DisposableBean {
private DefaultMQProducer producer;
private String producerGroup;
private String namesrvAddr;
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
@Override
public DefaultMQProducer getObject() throws Exception {
return producer;
}
@Override
public Class<?> getObjectType() {
return DefaultMQProducer.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.start();
}
@Override
public void destroy() throws Exception {
producer.shutdown();
}
}
rocketmqHome=D:/github/rocketmq/distribution
rocketmqHome=/Users/javahongxi/github/rocketmq/distribution
namesrvAddr=127.0.0.1:9876
brokerName=broker-b
listenPort=20911
......
rocketmqHome=D:/github/rocketmq/distribution
rocketmqHome=/Users/javahongxi/github/rocketmq/distribution
namesrvAddr=127.0.0.1:9876
brokerName=broker-a
listenPort=10911
......
rocketmqHome=D:/github/rocketmq/distribution
rocketmqHome=/Users/javahongxi/github/rocketmq/distribution
listenPort=9876
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
default-autowire="byName">
<bean class="org.hongxi.whatsmars.mq.rocketmq.spring.ConsumerFactoryBean">
<property name="consumerGroup" value="quick_start_consumer_group" />
<property name="namesrvAddr" value="127.0.0.1:9876" />
<property name="topic" value="TopicTest" />
<property name="tags" value="*" />
<property name="messageListener" ref="demoMessageListener" />
</bean>
<bean id="demoMessageListener" class="org.hongxi.whatsmars.mq.rocketmq.spring.DemoMessageListener" />
</beans>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
default-autowire="byName">
<bean id="defaultMQProducer" class="org.hongxi.whatsmars.mq.rocketmq.spring.ProducerFactoryBean">
<property name="producerGroup" value="quick_start_producer_group" />
<property name="namesrvAddr" value="127.0.0.1:9876" />
</bean>
</beans>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册