提交 af667e04 编写于 作者: H haoxr

refactor: 移除动态mq队列方式,死信队列关单优化

上级 c01d0d46
package com.youlai.common.rabbitmq.queue;
package com.youlai.mall.oms.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
......@@ -6,6 +6,7 @@ import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
......@@ -13,20 +14,36 @@ import java.util.Map;
/**
* 订单超时关单延时队列
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @author haoxr
* @date 2022/2/4 23:21
*/
@Deprecated
@Component
@Slf4j
public class OrderCloseQueue {
public class OrderCloseRabbitConfig {
// 延迟队列
private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue";
private static final String ORDER_EXCHANGE = "order.exchange";
private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay.routing.key";
// 死信队列
private static final String ORDER_ClOSE_QUEUE = "order.close.queue";
private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";
private static final String ORDER_ClOSE_ROUTING_KEY = "order.close.routing.key";
/**
* 定义交换机
*/
@Bean
public Exchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
return new DirectExchange(ORDER_EXCHANGE, true, false);
}
@Bean
public Exchange orderDlxExchange() {
return new DirectExchange(ORDER_DLX_EXCHANGE, true, false);
}
/**
......@@ -34,13 +51,12 @@ public class OrderCloseQueue {
*/
@Bean
public Queue orderDelayQueue() {
log.info("延时队列(order.delay.queue)创建");
// 延时队列的消息过期了,会自动触发消息的转发,根据routingKey发送到指定的exchange中,exchange路由到死信队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "order.exchange");
args.put("x-dead-letter-routing-key", "order.close.routing.key"); // 死信路由Key
args.put("x-message-ttl", 60 * 1000L); // 单位:毫秒,1分钟测试使用
return new Queue("order.delay.queue", true, false, false, args);
args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE);
args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key
args.put("x-message-ttl", 5 * 1000L); // 5s
return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args);
}
......@@ -49,7 +65,8 @@ public class OrderCloseQueue {
*/
@Bean
public Binding orderDelayQueueBinding() {
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order.exchange", "order.create.routing.key", null);
return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE,
ORDER_CLOSE_DELAY_ROUTING_KEY, null);
}
......@@ -59,7 +76,7 @@ public class OrderCloseQueue {
@Bean
public Queue orderCloseQueue() {
log.info("死信队列(order.close.queue)创建");
return new Queue("order.close.queue", true, false, false);
return new Queue(ORDER_ClOSE_QUEUE, true, false, false);
}
/**
......@@ -67,7 +84,8 @@ public class OrderCloseQueue {
*/
@Bean
public Binding orderCloseQueueBinding() {
return new Binding("order.close.queue", Binding.DestinationType.QUEUE, "order.exchange", "order.close.routing.key", null);
return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE,
ORDER_ClOSE_ROUTING_KEY, null);
}
}
package com.youlai.mall.oms.controller.app;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.youlai.common.base.IBaseEnum;
import com.youlai.common.result.PageResult;
import com.youlai.common.result.Result;
import com.youlai.mall.oms.common.enums.PayTypeEnum;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.form.OrderSubmitForm;
import com.youlai.mall.oms.pojo.query.OrderPageQuery;
......@@ -12,8 +10,6 @@ import com.youlai.mall.oms.pojo.vo.OrderConfirmVO;
import com.youlai.mall.oms.pojo.vo.OrderSubmitResultVO;
import com.youlai.mall.oms.service.OrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.validation.annotation.Validated;
......@@ -23,8 +19,7 @@ import org.springframework.web.bind.annotation.*;
* 「移动端」订单控制层
*
* @author huawei
* @email huawei_code@163.com
* @date 2020-12-30 22:31:10
* @date 2020/12/30
*/
@Api(tags = "「移动端」订单接口")
@RestController
......@@ -66,14 +61,9 @@ public class OrderController {
@ApiOperation("订单支付")
@PostMapping("/{orderId}/_pay")
@ApiImplicitParams({@ApiImplicitParam(name = "orderId", value = "订单ID", paramType = "path", dataType = "Long"), @ApiImplicitParam(name = "payType", value = "支付方式", paramType = "query", dataType = "Integer"), @ApiImplicitParam(name = "appId", value = "小程序appId", paramType = "query", dataType = "String")})
public <T> Result<T> pay(@PathVariable Long orderId, Integer payType, String appId) {
PayTypeEnum payTypeEnum = IBaseEnum.getEnumByValue(payType, PayTypeEnum.class);
if (payTypeEnum == null) {
return Result.failed("系统暂不支持该支付方式~");
}
return Result.success(orderService.pay(orderId, appId, payTypeEnum));
public Result payOrder(@PathVariable Long orderId) {
boolean result = orderService.payOrder(orderId);
return Result.judge(result);
}
@ApiOperation("订单删除")
......
package com.youlai.mall.oms.converter;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.form.OrderSubmitForm;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
/**
* 订单转化器
*
* @author haoxr
* @date 2022/12/21
*/
@Mapper(componentModel = "spring")
public interface OrderConverter {
@Mappings({
@Mapping(target = "orderSn", source = "orderToken"),
@Mapping(target = "totalQuantity", expression = "java(orderSubmitForm.getOrderItems().stream().map(OrderItemDTO::getCount).reduce(0, Integer::sum))"),
@Mapping(target = "totalAmount", expression = "java(orderSubmitForm.getOrderItems().stream().map(item -> item.getPrice() * item.getCount()).reduce(0L, Long::sum))"),
})
OmsOrder submitForm2Entity(OrderSubmitForm orderSubmitForm);
}
\ No newline at end of file
package com.youlai.mall.oms.converter;
import cn.hutool.core.collection.CollectionUtil;
import com.youlai.mall.oms.pojo.dto.OrderItemDTO;
import com.youlai.mall.oms.pojo.entity.OmsOrderItem;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* 订单对象转化器
*
* @author haoxr
* @date 2022/12/21
*/
@Mapper(componentModel = "spring")
public interface OrderItemConverter {
@Mappings({
@Mapping(target = "totalAmount", expression = "java(dto.getPrice() * dto.getCount())"),
@Mapping(target = "orderId", source = "orderId"),
})
OmsOrderItem dto2Entity(Long orderId, OrderItemDTO dto);
default List<OmsOrderItem> dto2Entity(Long orderId, List<OrderItemDTO> list) {
if (CollectionUtil.isNotEmpty(list)) {
List<OmsOrderItem> entities = list.stream().map(dto -> dto2Entity(orderId, dto))
.collect(Collectors.toList());
return entities;
}
return Collections.EMPTY_LIST;
}
}
\ No newline at end of file
......@@ -17,64 +17,65 @@ import java.io.IOException;
* @author haoxr
* @date 2022/12/19
*/
@Component
//@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseListener {
private final OrderService orderService;
// 普通队列(创建订单,超时未支付转发至关单(死信)队列)
private static final String ORDER_CREATE_QUEUE = "order.create.queue";
// 延迟队列
private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue";
private static final String ORDER_EXCHANGE = "order.exchange";
private static final String ORDER_CREATE_ROUTING_KEY = "order.create.routing.key";
private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay.routing.key";
// 死信队列(关单队列)
// 关单队列
private static final String ORDER_ClOSE_QUEUE = "order.close.queue";
private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";
private static final String ORDER_ClOSE_ROUTING_KEY = "order.close.routing.key";
/**
* 普通队列(创建订单)消费处理
* 延迟队列·
* <p>
* 超过 x-message-ttl 设定时间未被消费会转发到死信队列 ORDER_ClOSE_QUEUE
* 超过 x-message-ttl 设定时间未被消费转发到死信交换机
*/
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = ORDER_CREATE_QUEUE,
arguments =
{
@Argument(name = "x-dead-letter-exchange", value = ORDER_DLX_EXCHANGE),
@Argument(name = "x-dead-letter-routing-key", value = ORDER_ClOSE_ROUTING_KEY),
@Argument(name = "x-message-ttl", value = "10000", type = "java.lang.Long") // 单位毫秒
}),
exchange = @Exchange(value = ORDER_EXCHANGE),
key = {ORDER_CREATE_ROUTING_KEY}
@RabbitListener(bindings =
{
@QueueBinding(
value = @Queue(value = ORDER_CLOSE_DELAY_QUEUE,
arguments =
{
@Argument(name = "x-dead-letter-exchange", value = ORDER_DLX_EXCHANGE),
@Argument(name = "x-dead-letter-routing-key", value = ORDER_ClOSE_ROUTING_KEY),
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long") // 超时10s
}),
exchange = @Exchange(value = ORDER_EXCHANGE),
key = {ORDER_CLOSE_DELAY_ROUTING_KEY}
)
}, ackMode = "MANUAL" // 手动ACK
)
)
@RabbitHandler
public void handleOrderCreate(Message message, Channel channel) throws Exception {
public void handleOrderCloseDelay(String orderSn, Message message, Channel channel) throws IOException {
log.info("订单【{}】延时队列,10s内如果未支付将路由到关单队列", orderSn);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
/**
* @param 参数1
* @param deliveryTag 消息序号
* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)
* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)
*/
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag, false, false); // 等于 channel.basicReject(deliveryTag, false);
//channel.basicReject(deliveryTag, false); // 等于 channel.basicReject(deliveryTag, false);
}
/**
* 死信队列(关单)消费处理
* 关单队列
*/
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = ORDER_ClOSE_QUEUE, durable = "true"),
exchange = @Exchange(value = ORDER_DLX_EXCHANGE),
key = {ORDER_ClOSE_ROUTING_KEY}
),
ackMode = "MANUAL" // 手动ACK
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = ORDER_ClOSE_QUEUE, durable = "true"),
exchange = @Exchange(value = ORDER_DLX_EXCHANGE),
key = {ORDER_ClOSE_ROUTING_KEY}
)
}, ackMode = "MANUAL" // 手动ACK
)
@RabbitHandler
@RabbitListener(queues = "order.close.queue")
public void handleOrderClose(String orderSn, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序号
......@@ -84,7 +85,9 @@ public class OrderCloseListener {
orderService.closeOrder(orderSn);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicReject(deliveryTag, true);
// TODO 关单失败,入定时任务表
channel.basicReject(deliveryTag, false);
}
}
}
package com.youlai.mall.oms.pojo.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 订单提交结果
*
* @author huawei
* @date 2021/1/21
*/
@ApiModel("订单提交结果")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderSubmitResultVO {
@ApiModelProperty("订单ID")
private Long orderId;
@ApiModelProperty("订单编号,进入支付页面显示")
private String orderSn;
}
......@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.github.binarywang.wxpay.bean.notify.SignatureHeader;
import com.github.binarywang.wxpay.exception.WxPayException;
import com.youlai.mall.oms.common.enums.PayTypeEnum;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.query.OrderPageQuery;
import com.youlai.mall.oms.pojo.vo.OrderConfirmVO;
......@@ -40,12 +39,12 @@ public interface OrderService extends IService<OmsOrder> {
/**
* 订单支付
*/
<T> T pay(Long orderId, String appId, PayTypeEnum payTypeEnum);
boolean payOrder(Long orderId);
/**
* 系统关闭订单
*/
boolean closeOrder(String orderToken);
boolean closeOrder(String orderSn);
/**
* 删除订单
......
......@@ -53,6 +53,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
......@@ -146,7 +147,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
RequestContextHolder.setRequestAttributes(attributes);
String orderToken = businessSnGenerator.generateSerialNo("ORDER");
orderConfirmVO.setOrderToken(orderToken);
redisTemplate.opsForValue().set(ORDER_TOKEN_PREFIX + orderToken, orderToken);
redisTemplate.opsForValue().set(ORDER_RESUBMIT_LOCK_PREFIX + orderToken, orderToken);
}, threadPoolExecutor);
CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, getOrderTokenFuture).join();
......@@ -171,8 +172,9 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
new DefaultRedisScript<>(
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", Long.class
),
Collections.singletonList(ORDER_TOKEN_PREFIX + orderSn),
orderSn); // 释放锁成功则返回1
Collections.singletonList(ORDER_RESUBMIT_LOCK_PREFIX + orderSn),
orderSn
); // 释放锁成功则返回1
Assert.isTrue(releaseLockResult.equals(1l), "订单重复提交,请刷新页面后重试");
// 订单验价
......@@ -193,7 +195,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
LockStockDTO lockStockDTO = new LockStockDTO(orderSn, lockedSkus);
Result lockStockResult = skuFeignClient.lockStock(lockStockDTO);
Assert.isTrue(Result.isSuccess(lockStockResult), "订单提交失败:订单商品库存数量不足!");
Assert.isTrue(Result.isSuccess(lockStockResult), "订单提交失败:锁定商品库存失败!");
// 创建订单
OmsOrder orderEntity = orderConverter.submitForm2Entity(orderSubmitForm);
......@@ -203,11 +205,11 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
// 添加订单明细
Long orderId = orderEntity.getId();
if (result) {
List<OmsOrderItem> orderItemEntities = orderItemConverter.dto2Entity(orderId, orderItems);
result = orderItemService.saveBatch(orderItemEntities);
List<OmsOrderItem> itemEntities = orderItemConverter.dto2Entity(orderId, orderItems);
result = orderItemService.saveBatch(itemEntities);
if (result) {
// 订单超时未支付取消延时队列(TTL + 死信队列实现)
rabbitTemplate.convertAndSend("order.create.exchange", "order.create.routing.key", orderSn);
// 订单超时未支付关单
rabbitTemplate.convertAndSend("order.exchange", "order.close.delay.routing.key", orderSn);
}
}
Assert.isTrue(result, "订单提交失败");
......@@ -222,29 +224,29 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
*/
@Override
@GlobalTransactional
public <T> T pay(Long orderId, String appId, PayTypeEnum payTypeEnum) {
public boolean payOrder(Long orderId) {
OmsOrder order = this.getById(orderId);
Assert.isTrue(order != null, "订单不存在");
Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "订单不可支付,请检查订单状态");
RLock lock = redissonClient.getLock(ORDER_SN_PREFIX + order.getOrderSn());
RLock lock = redissonClient.getLock(ORDER_LOCK_PREFIX + order.getOrderSn());
try {
lock.lock();
T result;
switch (payTypeEnum) {
case WX_JSAPI:
result = (T) wxJsapiPay(appId, order);
break;
default:
result = (T) balancePay(order);
break;
}
// 扣减余额
memberFeignClient.deductBalance(SecurityUtils.getMemberId(), order.getPayAmount());
// 扣减库存
Result<?> deductStockResult = skuFeignClient.deductStock(order.getOrderSn());
Assert.isTrue(Result.isSuccess(deductStockResult), "扣减商品库存失败");
return result;
skuFeignClient.deductStock(order.getOrderSn());
// 修改订单状态 → 【已支付】
order.setStatus(OrderStatusEnum.PAID.getValue());
order.setPayType(PayTypeEnum.BALANCE.getValue());
order.setPayTime(new Date());
this.updateById(order);
// 支付成功删除购物车已勾选的商品
cartService.removeCheckedItem();
return true;
} finally {
//释放锁
if (lock.isLocked()) {
......@@ -253,29 +255,13 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
}
}
/**
* 余额支付
* 微信支付
*
* @param appId
* @param order
* @return
*/
private Boolean balancePay(OmsOrder order) {
// 扣减余额
Long memberId = SecurityUtils.getMemberId();
Long amount = order.getPayAmount();
Result<?> deductBalanceResult = memberFeignClient.deductBalance(memberId, amount);
Assert.isTrue(Result.isSuccess(deductBalanceResult), "扣减账户余额失败");
// 更新订单状态
order.setStatus(OrderStatusEnum.PAID.getValue());
order.setPayType(PayTypeEnum.BALANCE.getValue());
order.setPayTime(new Date());
this.updateById(order);
// 支付成功删除购物车已勾选的商品
cartService.removeCheckedItem();
return true;
}
private WxPayUnifiedOrderV3Result.JsapiResult wxJsapiPay(String appId, OmsOrder order) {
Long memberId = SecurityUtils.getMemberId();
Long payAmount = order.getPayAmount();
......@@ -291,7 +277,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
// 用户id前补零保证五位,对超出五位的保留后五位
String userIdFilledZero = String.format("%05d", memberId);
String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5);
// 在前面加上wxo(weixin order)等前缀是为了人工可以快速分辨订单号是下单还是退款、来自哪家支付机构等
// 在前面加上wxo(wx order)等前缀是为了人工可以快速分辨订单号是下单还是退款、来自哪家支付机构等
// 将时间戳+3位随机数+五位id组成商户订单号,规则参考自<a href="https://tech.meituan.com/2016/11/18/dianping-order-db-sharding.html">大众点评</a>
String outTradeNo = "wxo_" + System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId;
log.info("商户订单号拼接完成:{}", outTradeNo);
......@@ -330,7 +316,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
if (OrderStatusEnum.UNPAID.getValue().equals(order.getStatus())) {
result = this.update(new LambdaUpdateWrapper<OmsOrder>()
.eq(OmsOrder::getId, order.getId())
.set(OmsOrder::getStatus, OrderStatusEnum.CANCELED));
.set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue()));
// 关单成功释放锁定的商品库存
rabbitTemplate.convertAndSend("stock.exchange", "stock.release.routing.key", orderSn);
} else { // 订单非【待付款】状态无需关闭
......@@ -360,7 +346,6 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
return this.removeById(orderId);
}
@Override
public void handleWxPayOrderNotify(SignatureHeader signatureHeader, String notifyData) throws WxPayException {
log.info("开始处理支付结果通知");
......
......@@ -27,13 +27,13 @@ public interface SkuFeignClient {
* 解锁商品库存
*/
@PutMapping("/app-api/v1/sku/_unlock")
Result unlockStock(@RequestParam String orderToken);
Result unlockStock(@RequestParam String orderSn);
/**
* 扣减订单商品库存
*/
@PutMapping("/app-api/v1/sku/_deduct")
Result deductStock(@RequestParam String orderToken);
Result deductStock(@RequestParam String orderSn);
/**
* 订单商品验价
......
......@@ -4,7 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.youlai.common.result.Result;
import com.youlai.mall.pms.pojo.form.PmsCategoryAttributeForm;
import com.youlai.mall.pms.pojo.entity.PmsCategoryAttribute;
import com.youlai.mall.pms.service.IPmsAttributeService;
import com.youlai.mall.pms.service.AttributeService;
import io.swagger.annotations.*;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -25,7 +25,7 @@ import java.util.List;
@AllArgsConstructor
public class PmsAttributeController {
private IPmsAttributeService iPmsAttributeService;
private AttributeService attributeService;
@ApiOperation(value = "属性列表")
@GetMapping
......@@ -33,7 +33,7 @@ public class PmsAttributeController {
@ApiParam("商品分类ID") Long categoryId,
@ApiParam("类型(1:规格;2:属性)") Integer type
) {
List<PmsCategoryAttribute> list = iPmsAttributeService.list(new LambdaQueryWrapper<PmsCategoryAttribute>()
List<PmsCategoryAttribute> list = attributeService.list(new LambdaQueryWrapper<PmsCategoryAttribute>()
.eq(categoryId != null, PmsCategoryAttribute::getCategoryId, categoryId)
.eq(type != null, PmsCategoryAttribute::getType, type)
);
......@@ -43,7 +43,7 @@ public class PmsAttributeController {
@ApiOperation(value = "批量新增/修改")
@PostMapping("/batch")
public Result saveBatch(@RequestBody PmsCategoryAttributeForm pmsCategoryAttributeForm) {
boolean result = iPmsAttributeService.saveBatch(pmsCategoryAttributeForm);
boolean result = attributeService.saveBatch(pmsCategoryAttributeForm);
return Result.judge(result);
}
}
......@@ -7,7 +7,7 @@ import com.youlai.common.result.PageResult;
import com.youlai.common.result.Result;
import com.youlai.mall.pms.pojo.entity.PmsBrand;
import com.youlai.mall.pms.pojo.query.BrandPageQuery;
import com.youlai.mall.pms.service.IPmsBrandService;
import com.youlai.mall.pms.service.BrandService;
import io.swagger.annotations.*;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
......@@ -27,7 +27,7 @@ import java.util.List;
@RequiredArgsConstructor
public class PmsBrandController {
private final IPmsBrandService brandService;
private final BrandService brandService;
@ApiOperation(value = "品牌分页列表")
@GetMapping("/pages")
......
......@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
/**
* Swagger 配置
*
* @author haoxr
* @date 2021/02/25 15:36
*/
......
package com.youlai.common.rabbitmq.config;
import com.youlai.common.rabbitmq.dynamic.RabbitModuleInitializer;
import com.youlai.common.rabbitmq.dynamic.RabbitModuleProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -19,7 +15,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@Slf4j
public class RabbitMQConfig {
public class RabbitConfig {
/**
* 消息序列化配置
......@@ -28,17 +24,7 @@ public class RabbitMQConfig {
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMessageConverter( new Jackson2JsonMessageConverter());
return factory;
}
/**
* 动态创建队列、交换机初始化器
*/
@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
return new RabbitModuleInitializer(amqpAdmin, rabbitModuleProperties);
}
}
package com.youlai.common.rabbitmq.dynamic;
/**
* RabbitMQ 交换机类型枚举
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 10:34
*/
public enum RabbitExchangeTypeEnum {
/**
* 直连交换机
* <p>
* 根据routing-key精准匹配队列(最常使用)
*/
DIRECT,
/**
* 主题交换机
* <p>
* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
*/
TOPIC,
/**
* 扇形交换机
* <p>
* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息
*/
FANOUT,
/**
* 头交换机
* <p>
* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)
*/
HEADERS;
}
package com.youlai.common.rabbitmq.dynamic;
import lombok.Data;
import java.util.Map;
/**
* RabbitMQ 队列和交换机机绑定关系实体对象
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 0:20
*/
@Data
public class RabbitModuleInfo {
/**
* 路由Key
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
*/
private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.DIRECT; // 默认直连交换机
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/
private boolean autoDelete = false; // 默认false,不自动删除
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
/**
* 队列信息类
*/
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 是否具有排他性
*/
private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列
/**
* 当消费者均断开连接,是否自动删除队列
*/
private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息
/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;
/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;
private Map<String, Object> arguments;
}
}
package com.youlai.common.rabbitmq.dynamic;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* RabbitMQ队列初始化器
*
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 15:16
*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {
private AmqpAdmin amqpAdmin;
private RabbitModuleProperties rabbitModuleProperties;
public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
this.amqpAdmin = amqpAdmin;
this.rabbitModuleProperties = rabbitModuleProperties;
}
@Override
public void afterSingletonsInstantiated() {
log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
declareRabbitModule();
}
/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/
private void declareRabbitModule() {
List<RabbitModuleInfo> rabbitModuleInfos = rabbitModuleProperties.getModules();
if (CollectionUtil.isEmpty(rabbitModuleInfos)) {
return;
}
for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
configParamValidate(rabbitModuleInfo);
// 队列
Queue queue = convertQueue(rabbitModuleInfo.getQueue());
// 交换机
Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
// 绑定关系
String routingKey = rabbitModuleInfo.getRoutingKey();
String queueName = rabbitModuleInfo.getQueue().getName();
String exchangeName = rabbitModuleInfo.getExchange().getName();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
// 创建队列
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
/**
* RabbitMQ动态配置参数校验
*
* @param rabbitModuleInfo
*/
public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置queue的name属性", routingKey);
}
/**
* 转换生成RabbitMQ队列
*
* @param queue
* @return
*/
public Queue convertQueue(RabbitModuleInfo.Queue queue) {
Map<String, Object> arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null && arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 是否需要绑定死信队列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap<>(4);
}
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 转换生成RabbitMQ交换机
*
* @param exchangeInfo
* @return
*/
public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {
AbstractExchange exchange = null;
RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();
String exchangeName = exchangeInfo.getName();
boolean isDurable = exchangeInfo.isDurable();
boolean isAutoDelete = exchangeInfo.isAutoDelete();
Map<String, Object> arguments = exchangeInfo.getArguments();
switch (exchangeType) {
case DIRECT:// 直连交换机
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case TOPIC: // 主题交换机
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case FANOUT: //扇形交换机
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case HEADERS: // 头交换机
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
}
return exchange;
}
}
package com.youlai.common.rabbitmq.dynamic;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
/**
* @author <a href="mailto:xianrui0365@163.com">haoxr</a>
* @date 2022/4/4 14:51
*/
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Data
public class RabbitModuleProperties {
private List<RabbitModuleInfo> modules;
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.youlai.common.rabbitmq.config.RabbitMQConfig,\
com.youlai.common.rabbitmq.dynamic.RabbitModuleProperties
......@@ -23,7 +23,7 @@ public class CanalListener {
private final SysMenuService menuService;
@RabbitListener(queues = "canal.queue")
//@RabbitListener(queues = "canal.queue")
public void handleDataChange(@Payload CanalMessage message) {
String tableName = message.getTable();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册