提交 d63a5f9e 编写于 作者: C cbwleft
上级 d7a69ea5
......@@ -64,6 +64,10 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--wx_pay-->
<dependency>
<groupId>com.github.binarywang</groupId>
......
package org.xxpay.boot.service.mq;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import javax.jms.Queue;
/**
* @Description:
......@@ -19,6 +15,8 @@ public class MqConfig {
public static final String PAY_NOTIFY_QUEUE_NAME = "pay.notify.queue";
public static final String PAY_NOTIFY_EXCHANGE_NAME = "pay.notify.exchange";
public static class Impl{
public static final String ACTIVE_MQ = "activeMQ";
public static final String RABBIT_MQ = "rabbitMQ";
......
package org.xxpay.boot.service.mq.impl;
import javax.annotation.PostConstruct;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import org.xxpay.boot.service.mq.Mq4PayNotify;
import org.xxpay.boot.service.mq.MqConfig;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_EXCHANGE_NAME;
@Component
@Profile(MqConfig.Impl.RABBIT_MQ)
public class RabbitMq4PayNotify extends Mq4PayNotify {
@Autowired
private AmqpAdmin amqpAdmin;
@PostConstruct
public void init() {
DirectExchange exchange = new DirectExchange(PAY_NOTIFY_EXCHANGE_NAME);
exchange.setDelayed(true);
Queue queue = new Queue(PAY_NOTIFY_QUEUE_NAME);
Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(binding);
}
@Autowired
private AmqpTemplate rabbitTemplate;
@Override
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
rabbitTemplate.convertAndSend(PAY_NOTIFY_QUEUE_NAME, msg);
}
@Override
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
rabbitTemplate.convertAndSend(PAY_NOTIFY_EXCHANGE_NAME, PAY_NOTIFY_QUEUE_NAME, msg, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay((int) delay);
return message;
}
});
}
@RabbitListener(queues = PAY_NOTIFY_QUEUE_NAME)
public void onMessage(String msg) {
receive(msg);
}
}
......@@ -24,7 +24,9 @@ spring:
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
profiles:
active: prod
include: activeMQ
include:
#- activeMQ
- rabbitMQ # 需要安装延迟队列插件:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
activemq:
broker-url: failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=0)
......@@ -32,6 +34,12 @@ spring:
pool:
enabled: true # 如果此处设置为true,需要加activemq-pool依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
rabbitmq:
addresses: 127.0.0.1:5672
username: guest
password: guest
dynamic: true
config:
ali:
notify_url: http://api.xxpay.org/notify/pay/aliPayNotifyRes.htm
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册