提交 f4cf9ed1 编写于 作者: 檀越@新空间's avatar 檀越@新空间 🐭

fix:配置

上级 f13f3ea7
...@@ -12,5 +12,12 @@ ...@@ -12,5 +12,12 @@
<maven.compiler.source>8</maven.compiler.source> <maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target> <maven.compiler.target>8</maven.compiler.target>
</properties> </properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
...@@ -11,5 +11,18 @@ ...@@ -11,5 +11,18 @@
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-client:4.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-common:4.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-remoting:4.8.0" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.69" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-all:4.0.42.Final" level="project" />
<orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-logging:4.8.0" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-tcnative-boringssl-static:1.1.33.Fork26" level="project" />
<orderEntry type="library" name="Maven: commons-validator:commons-validator:1.6" level="project" />
<orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils:1.9.2" level="project" />
<orderEntry type="library" name="Maven: commons-digester:commons-digester:1.8.1" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" />
</component> </component>
</module> </module>
\ No newline at end of file
...@@ -11,23 +11,23 @@ import java.util.List; ...@@ -11,23 +11,23 @@ import java.util.List;
/** /**
* 消息的接受者 * 消息的接受者
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 11:45
*/ */
public class Consumer { public class Consumer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("base", "*"); consumer.subscribe("base", "*");
//设定消费模式:负载均衡|广播模式 //设定消费模式:负载均衡|广播模式
consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容 //接受消息内容
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
...@@ -40,4 +40,4 @@ public class Consumer { ...@@ -40,4 +40,4 @@ public class Consumer {
//5.启动消费者consumer //5.启动消费者consumer
consumer.start(); consumer.start();
} }
} }
\ No newline at end of file
...@@ -11,15 +11,13 @@ import java.util.concurrent.TimeUnit; ...@@ -11,15 +11,13 @@ import java.util.concurrent.TimeUnit;
* 发送异步消息 * 发送异步消息
*/ */
public class AsyncProducer { public class AsyncProducer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -34,6 +32,7 @@ public class AsyncProducer { ...@@ -34,6 +32,7 @@ public class AsyncProducer {
* 发送成功回调函数 * 发送成功回调函数
* @param sendResult * @param sendResult
*/ */
@Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult); System.out.println("发送结果:" + sendResult);
} }
...@@ -42,16 +41,15 @@ public class AsyncProducer { ...@@ -42,16 +41,15 @@ public class AsyncProducer {
* 发送失败回调函数 * 发送失败回调函数
* @param e * @param e
*/ */
@Override
public void onException(Throwable e) { public void onException(Throwable e) {
System.out.println("发送异常:" + e); System.out.println("发送异常:" + e);
} }
}); });
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
} }
\ No newline at end of file
...@@ -15,7 +15,7 @@ public class OneWayProducer { ...@@ -15,7 +15,7 @@ public class OneWayProducer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -16,7 +16,7 @@ public class SyncProducer { ...@@ -16,7 +16,7 @@ public class SyncProducer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -13,7 +13,7 @@ public class Consumer { ...@@ -13,7 +13,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("BatchTopic", "*"); consumer.subscribe("BatchTopic", "*");
......
...@@ -15,7 +15,7 @@ public class Producer { ...@@ -15,7 +15,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -14,7 +14,7 @@ public class Consumer { ...@@ -14,7 +14,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("DelayTopic", "*"); consumer.subscribe("DelayTopic", "*");
......
...@@ -16,7 +16,7 @@ public class Producer { ...@@ -16,7 +16,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -14,7 +14,7 @@ public class Consumer { ...@@ -14,7 +14,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
......
...@@ -13,7 +13,7 @@ public class Producer { ...@@ -13,7 +13,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -13,7 +13,7 @@ public class Consumer { ...@@ -13,7 +13,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");
......
...@@ -13,7 +13,7 @@ public class Producer { ...@@ -13,7 +13,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
......
...@@ -14,7 +14,7 @@ public class Consumer { ...@@ -14,7 +14,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic", "*"); consumer.subscribe("OrderTopic", "*");
......
...@@ -14,7 +14,7 @@ public class Producer { ...@@ -14,7 +14,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
//构建消息集合 //构建消息集合
......
...@@ -17,7 +17,7 @@ public class Consumer { ...@@ -17,7 +17,7 @@ public class Consumer {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址 //2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("TransactionTopic", "*"); consumer.subscribe("TransactionTopic", "*");
......
package transaction; package transaction;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -15,7 +16,7 @@ public class Producer { ...@@ -15,7 +16,7 @@ public class Producer {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("group5"); TransactionMQProducer producer = new TransactionMQProducer("group5");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//添加事务监听器 //添加事务监听器
producer.setTransactionListener(new TransactionListener() { producer.setTransactionListener(new TransactionListener() {
......
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876
rocketmq.producer.group=my-group rocketmq.producer.group=my-group
\ No newline at end of file
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876
rocketmq.producer.group=my-group rocketmq.producer.group=my-group
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册