提交 fee19802 编写于 作者: 郝先瑞

feat(common-rabbitmq): 动态创建RabbitMQ的队列、交换机和绑定关系

Closes #I511UE
上级 c46bc483
package com.youlai.common.rabbitmq.config;
import com.youlai.common.rabbitmq.dynamic.RabbitModuleInitializer;
import com.youlai.common.rabbitmq.dynamic.RabbitModuleProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author huawei
* @desc
* @email huawei_code@163.com
* @date 2021/1/17
*/
@Configuration
@EnableTransactionManagement
public class RabbitMQConfig {
@Slf4j
public class RabbitConfig {
/**
* 使用json序列化机制,进行消息转换
*
......@@ -25,4 +30,15 @@ public class RabbitMQConfig {
return new Jackson2JsonMessageConverter();
}
/**
* 动态创建队列、交换机初始化器
*
* @return
*/
@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
return new RabbitModuleInitializer(amqpAdmin, rabbitModuleProperties);
}
}
package com.youlai.common.rabbitmq.dynamic;
/**
* RabbitMQ 交换机类型枚举
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 10:34
*/
public enum RabbitExchangeTypeEnum {
/**
* 直连交换机
* <p>
* 根据routing-key精准匹配队列(最常使用)
*/
DIRECT,
/**
* 主题交换机
* <p>
* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
*/
TOPIC,
/**
* 扇形交换机
* <p>
* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息
*/
FANOUT,
/**
* 头交换机
* <p>
* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)
*/
HEADERS;
}
package com.youlai.common.rabbitmq.dynamic;
import lombok.Data;
import java.util.Map;
/**
* RabbitMQ 队列和交换机机绑定关系实体对象
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 0:20
*/
@Data
public class RabbitModuleInfo {
/**
* 路由Key
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
*/
private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.DIRECT; // 默认直连交换机
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/
private boolean autoDelete = false; // 默认false,不自动删除
/**
* 是否延迟交换机
*/
private boolean delayed;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
/**
* 队列信息类
*/
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 是否具有排他性
*/
private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列
/**
* 当消费者均断开连接,是否自动删除队列
*/
private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息
/**
* 绑定死信队列的交换机名称
*/
private String deadExchangeName;
/**
* 绑定死信队列的路由key
*/
private String deadRoutingKey;
private Long messageTtl;
private Map<String, Object> arguments;
}
}
package com.youlai.common.rabbitmq.dynamic;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* RabbitMQ队列初始化器
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 15:16
*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {
private AmqpAdmin amqpAdmin;
private RabbitModuleProperties rabbitModuleProperties;
public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
this.amqpAdmin = amqpAdmin;
this.rabbitModuleProperties = rabbitModuleProperties;
}
@Override
public void afterSingletonsInstantiated() {
log.info("初始化 RabbitMQ 队列、交换机和绑定关系");
initRabbitModule();
}
/**
* 初始化 RabbitMQ 队列、交换机和绑定关系
*/
private void initRabbitModule() {
List<RabbitModuleInfo> rabbitModuleInfos = rabbitModuleProperties.getModules();
if (CollectionUtil.isEmpty(rabbitModuleInfos)) {
return;
}
for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
configParamValidate(rabbitModuleInfo);
// 队列
Queue queue = convertToQueue(rabbitModuleInfo.getQueue());
// 交换机
Exchange exchange = convertToExchange(rabbitModuleInfo.getExchange());
// 绑定关系
String routingKey = rabbitModuleInfo.getRoutingKey();
String queueName = rabbitModuleInfo.getQueue().getName();
String exchangeName = rabbitModuleInfo.getExchange().getName();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
// 创建队列
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
/**
* RabbitMQ动态配置参数校验
*
* @param rabbitModuleInfo
*/
public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
}
/**
* 转换生成RabbitMQ队列
*
* @param queue
* @return
*/
public Queue convertToQueue(RabbitModuleInfo.Queue queue) {
Map<String, Object> arguments = queue.getArguments();
// 是否需要绑定死信队列
String deadExchangeName = queue.getDeadExchangeName();
String deadRoutingKey = queue.getDeadRoutingKey();
if (StrUtil.isNotBlank(deadExchangeName) && StrUtil.isNotBlank(deadRoutingKey)) {
if (arguments != null) {
arguments = new HashMap<>(4);
arguments.put("x-dead-letter-exchange", deadExchangeName);
arguments.put("x-dead-letter-routing-key", deadRoutingKey);
}
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 转换生成RabbitMQ交换机
*
* @param exchangeInfo
* @return
*/
public Exchange convertToExchange(RabbitModuleInfo.Exchange exchangeInfo) {
AbstractExchange exchange = null;
RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();
String exchangeName = exchangeInfo.getName();
boolean isDurable = exchangeInfo.isDurable();
boolean isAutoDelete = exchangeInfo.isAutoDelete();
Map<String, Object> arguments = exchangeInfo.getArguments();
switch (exchangeType) {
case DIRECT:// 直连交换机
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case TOPIC: // 主题交换机
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case FANOUT: //扇形交换机
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case HEADERS: // 头交换机
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
}
exchange.setDelayed(exchangeInfo.isDelayed());
return exchange;
}
}
package com.youlai.common.rabbitmq.dynamic;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
/**
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 14:51
*/
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Data
public class RabbitModuleProperties {
private List<RabbitModuleInfo> modules;
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.youlai.common.rabbitmq.config.RabbitMQConfig,\
com.youlai.common.rabbitmq.queue.OrderCloseQueue
com.youlai.common.rabbitmq.config.RabbitConfig,\
com.youlai.common.rabbitmq.queue.OrderCloseQueue,\
com.youlai.common.rabbitmq.dynamic.RabbitModuleProperties
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册