提交 957580ac 编写于 作者: huawei_code1994's avatar huawei_code1994

feat:添加rabbitmq支持

上级 d2dfe6c4
package com.youlai.common.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
......@@ -33,16 +33,17 @@ public class RabbitMqConfig {
* @return
*/
@Bean
public MessageConverter messageConverter() {
public MessageConverter jackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 为容器创建号rabbitTemplate注册confirmCallback
* 消息由生产者投递到Broker/Exchange回调
* 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
* 步骤1:yaml文件中添加配置 spring.rabbitmq.publisher-confirm-type: correlated
* 步骤2:编写代码
*/
@PostConstruct
public void setExchangeCallback() {
public void setConfirmCallback() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
......@@ -63,27 +64,24 @@ public class RabbitMqConfig {
}
/**
*
* 注意下面两项必须同时配置,可以尝试不配置第二项,通过测试能够发现当消息路由到Queue失败(比如路由件错误)时,returnCallback并未被回调。
* # 开启阶段二(消息从E->Q)的确认回调 Exchange --> Queue returnCallback
* spring.rabbitmq.publisher-returns=true
* # 官方文档说此时这一项必须设置为true
* # 实际上这一项的作用是:消息【未成功到达】队列时,能监听到到路由不可达的消息,以异步方式优先调用我们自己设置的returnCallback,默认情况下,这个消息会被直接丢弃,无法监听到
*
* #为true,则交换机处理消息到路由失败,则会返回给生产者
* spring.rabbitmq.template.mandatory=true
*/
@PostConstruct
public void setQueueCallback() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 这个方法的参数并没有像 confirmCallback 那样提供boolean类型的ack,因此这个回调只能在【失败】情况下触发
* @param message 发送消息
* @param replyCode 回复错误码
* @param replyText 回复错误内容
* @param exchange 发送消息时指定的交换机
* @param routingKey 发送消息时使用的路由件
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]", message, exchange, routingKey, replyCode, replyText);
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]",
returnedMessage.getMessage(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText());
}
});
}
......
package com.youlai.common.rabbitmq.demo.exchangeToExchange;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException {
Send.declareExchanges();
Send.declareExchangesBindings();
Recv.declareQueues();
Recv.declareQueueBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("MobileQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("MobileQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("ACQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("ACQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("LightQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("LightQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("LaptopQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("LaptopQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("FanQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("FanQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
//Declare the Queues
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues for linked-direct-exchange
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);
//Create the Queues for home-direct-exchange
channel.queueDeclare("FanQ", true, false, false, null);
channel.queueDeclare("LaptopQ", true, false, false, null);
channel.close();
}
//Create the Bindings between respective Queues and Exchanges
public static void declareQueueBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("MobileQ", "linked-direct-exchange", "personalDevice");
channel.queueBind("ACQ", "linked-direct-exchange", "homeAppliance");
channel.queueBind("LightQ", "linked-direct-exchange", "homeAppliance");
//Create the bindings - with home-direct-exchange
channel.queueBind("FanQ", "home-direct-exchange", "homeAppliance");
channel.queueBind("LaptopQ", "home-direct-exchange", "personalDevice");
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.exchangeToExchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class Send {
public static void main(String[] args) throws IOException, TimeoutException {
Send.declareExchanges();
Send.declareExchangesBindings();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Direct message - Turn on the Home Appliances ";
channel.basicPublish("home-direct-exchange", "homeAppliance", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
//Declare the exchanges
public static void declareExchanges() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare both the exchanges - linked-direct-exchange and home-direct-exchange.
channel.exchangeDeclare("linked-direct-exchange", BuiltinExchangeType.DIRECT, true);
channel.exchangeDeclare("home-direct-exchange", BuiltinExchangeType.DIRECT, true);
channel.close();
}
//Create the Bindings between Exchanges.
public static void declareExchangesBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
// (destination-exchange, source-exchange , routingKey
channel.exchangeBind("linked-direct-exchange", "home-direct-exchange", "homeAppliance");
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.headers;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class HeadersRecv {
public static void main(String[] args) throws IOException, TimeoutException {
HeadersSend.declareExchange();
HeadersRecv.declareQueues();
HeadersRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("HealthQueue", true, false, false, null);
channel.queueDeclare("SportsQueue", true, false, false, null);
channel.queueDeclare("EducationQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey, headers) - routingKey != null
Map<String, Object> healthArgs = new HashMap<>();
healthArgs.put("x-match", "any"); //Match any of the header
healthArgs.put("h1", "Header1");
healthArgs.put("h2", "Header2");
channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs);
Map<String, Object> sportsArgs = new HashMap<>();
sportsArgs.put("x-match", "all"); //Match all of the header
sportsArgs.put("h1", "Header1");
sportsArgs.put("h2", "Header2");
channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs);
Map<String, Object> educationArgs = new HashMap<>();
educationArgs.put("x-match", "any"); //Match any of the header
educationArgs.put("h1", "Header1");
educationArgs.put("h2", "Header2");
channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs);
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.headers;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class HeadersSend {
public static void main(String[] args) throws IOException, TimeoutException {
HeadersSend.declareExchange();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Header Exchange example 1";
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h3", "Header3");
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
message = "Header Exchange example 2";
headerMap.put("h2", "Header2");
properties = new AMQP.BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true);
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.manager;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc 创建 Connection 连接工厂对象,创建连接
* @email huawei_code@163.com
* @date 2021/2/2
*/
public class ConnectionManager {
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
package com.youlai.common.rabbitmq.demo.pubSub;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 发布订阅模型实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class PubSubRecv {
public static void main(String[] args) throws IOException, TimeoutException {
PubSubSend.declareExchange();
PubSubRecv.declareQueues();
PubSubRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("WebClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("AppClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("WebClientQueue", true, false, false, null);
channel.queueDeclare("AppClientQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("WebClientQueue", "my-fanout-exchange", "");
channel.queueBind("AppClientQueue", "my-fanout-exchange", "");
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.pubSub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 发布订阅模型
* 简单说明:
* 1、之前工作模型中,我们虽然由多个消费者但是每个消息只能被一个消费者消费;在发布订阅模型中只要订阅了这个消息的消费者都可以接收到消息
* 2、正规的 RabbitMQ 使用方式,消息生产者先将消息发送到 Exchange 交换机中,在根据一定的策略将消息投递到队列中,消息生产者甚至不用知道队列的存在
* 3、Exchange 交换机需要做两件事:第一、接收来自生产者发送的消息;第二、将消息投递到队列中
* 4、Exchange 交换机必须知道如何正确的将消息投递到队列中(Direct exchange、Fanout exchange、Topic exchange、Headers exchange)
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class PubSubSend {
public static void main(String[] args) {
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
PubSubSend.declareExchange();
String message = "Hello world !";
channel.basicPublish("my-fanout-exchange", "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Send message :" + message);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.routing;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 路由模型实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class RoutingRecv {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建队列,并建立队列与路由器绑定关系
RoutingRecv.declareQueues();
RoutingRecv.declareBindings();
// 2、消费消息查看消费结果
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("WebClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("AppClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("WebClientQueue", true, false, false, null);
channel.queueDeclare("AppClientQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("WebClientQueue", "my_exchange_direct", "info");
channel.queueBind("WebClientQueue", "my_exchange_direct", "error");
channel.queueBind("WebClientQueue", "my_exchange_direct", "warning");
channel.queueBind("AppClientQueue", "my_exchange_direct", "error");
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.routing;
import cn.hutool.core.util.CharsetUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
/**
* @author huawei
* @desc RabbitMQ 路由模型
* 简单说明:
* 1、路由模型需要指定交换机类型为 direct,交换机和队列之间通过路由键绑定,交换机只会把消息推送到符合路由键的队列中
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class RoutingSend {
private final static String EXCHANGE_NAME = "my_exchange_direct";
public static void main(String[] args) {
// 1、创建 rabbitmq 连接和信道
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
String message = "Direct exchange,这条消息路由键是:info";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(CharsetUtil.UTF_8));
message = "Direct exchange,这条消息路由键是:error";
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(CharsetUtil.UTF_8));
message = "Direct exchange,这条消息路由键是:warning";
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(CharsetUtil.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.youlai.common.rabbitmq.demo.simple;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 简单队列实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class SimpleRecv {
public static void main(String[] args) throws IOException, TimeoutException {
// 2、创建连接,建立信道
Channel channel = ConnectionManager.getConnection().createChannel();
// 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费
channel.queueDeclare("hello", true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 4、接收处理消息并自动确认
channel.basicConsume("hello", true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}, consumerTag -> {
System.out.println(consumerTag);
});
}
}
package com.youlai.common.rabbitmq.demo.simple;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.nio.charset.StandardCharsets;
/**
* @author huawei
* @desc RabbitMQ 简单队列实战
* @email huawei_code@163.com
* @date 2021/1/29
*/
public class SimpleSend {
public static void main(String[] args) {
try (Channel channel = ConnectionManager.getConnection().createChannel();){
/**
* queue: 队列名称
* durable: 消息是否持久化
* exclusive: 消息是否排他
* autoDelete: 是否自动删除队列
* arguments: 其他参数(例如:死信队列等信息)
*/
channel.queueDeclare("hello", true, false, false, null);
String message = "hello!";
/**
* 参数:String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由
* routingKey: 路由键
* props: 配置信息
* body: 发送的消息
*/
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.youlai.common.rabbitmq.demo.topic;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ topic路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class TopicRecv {
public static void main(String[] args) throws IOException, TimeoutException {
TopicSend.declareExchange();
TopicRecv.declareQueues();
TopicRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("HealthQueue", true, false, false, null);
channel.queueDeclare("SportsQueue", true, false, false, null);
channel.queueDeclare("EducationQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("HealthQueue", "my-topic-exchange", "health.*");
channel.queueBind("SportsQueue", "my-topic-exchange", "#.sports.*");
channel.queueBind("EducationQueue", "my-topic-exchange", "#.education");
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 路由模型 - 通配符模式
* 简单说明:
* 1、路由模型需要指定交换机类型为 direct,交换机和队列之间通过路由键绑定,交换机只会把消息推送到符合路由键的队列中
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class TopicSend {
public static void main(String[] args) throws IOException, TimeoutException {
TopicSend.declareExchange();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Drink a lot of Water and stay Healthy!";
channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes());
message = "Learn something new everyday";
channel.basicPublish("my-topic-exchange", "education", null, message.getBytes());
message = "Stay fit in Mind and Body";
channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true);
channel.close();
}
}
package com.youlai.common.rabbitmq.demo.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* RabbitMQ 工作队列模型,是创建多个消费者共同消费消息,每个消息只可以被一个消费者处理
* 默认是轮询策略:使用轮询无法根据消费者消费速度合理分配消费数量,而是平均分配
* 需要限制消费者每次消费消息的数量,每次消费完了才可以进行下一次消费
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class ConsumerReceive1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2、创建连接,建立信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 模拟轮询策略,请将这部分注释
// 4、限制消费者每次消费消息数量,每次消息处理完成后才能消费下一条消息
int fetchCount = 1;
channel.basicQos(fetchCount);
// 4、创建回调消费消息,并手动确认收到消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("--- Consumer 1 Received Message: '" + message + "'");
//手工确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
package com.youlai.common.rabbitmq.demo.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* RabbitMQ 工作队列模型,是创建多个消费者共同消费消息,每个消息只可以被一个消费者处理
* 默认是轮询策略
*
* 简单队列模型和工作队列模型对比:
* 区别:
* 1、简单队列模型:是一个消费者和一个生产者
* 2、工作队列模型:由一个生产者生产消息,多个消费者共同消费消息,但是每个消息只可以被一个消费者处理
*
* 共同点:两种队列模型:都是直接将消息发送到Queue队列中,并没有Exchange交换机参与
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class WorkerRecv2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2、创建连接,建立信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 模拟轮询策略,请将这部分注释
// 4、限制消费者每次消费消息数量,每次消息处理完成后才能消费下一条消息
int fetchCount = 1;
channel.basicQos(fetchCount);
// 4、创建回调消费消息,并手动确认收到消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 5、关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
package com.youlai.common.rabbitmq.demo.work;
import cn.hutool.core.util.CharsetUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* <p>
* 消息生产能力大于消费能力,增加多几个消费节点
* 和简单队列类似,增加多个几个消费节点,处于竞争关系
* 默认策略:round robin 轮训
* @email huawei_code@163.com
* @date 2021/1/29
*/
public class WorkerSend {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
// 1、创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2、创建 rabbitmq 连接和信道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {
/**
* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue: 队列名称
* durable: 消息是否持久化
* exclusive: 消息是否排他
* autoDelete: 是否自动删除队列
* arguments: 其他参数(例如:死信队列等信息)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
/**
* 参数:String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由
* routingKey: 路由键
* props: 配置信息
* body: 发送的消息
*/
String message = "Hello world !" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(CharsetUtil.UTF_8));
System.out.println("--- Send Message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册