提交 97c55091 编写于 作者: 江南一点雨

消息发送高可用

上级 97c66c29
package org.javaboy.mailserver;
import org.javaboy.vhr.model.MailConstants;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
......@@ -14,6 +15,6 @@ public class MailserverApplication {
@Bean
Queue queue() {
return new Queue("javaboy.mail.welcome");
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
}
package org.javaboy.mailserver.receiver;
import org.apache.commons.logging.LogFactory;
import org.javaboy.vhr.model.Employee;
import org.javaboy.vhr.model.Hr;
import org.javaboy.vhr.model.MailConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
......@@ -40,7 +38,7 @@ public class MailReceiver {
@Autowired
TemplateEngine templateEngine;
@RabbitListener(queues = "javaboy.mail.welcome")
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Employee employee) {
logger.info(employee.toString());
//收到消息,发送邮件
......
package org.javaboy.vhr.mapper;
import org.apache.ibatis.annotations.Param;
import org.javaboy.vhr.model.MailSendLog;
import java.util.Date;
import java.util.List;
public interface MailSendLogMapper {
Integer updateMailSendLogStatus(@Param("msgId") String msgId, @Param("status") Integer status);
Integer insert(MailSendLog mailSendLog);
List<MailSendLog> getMailSendLogsByStatus();
Integer updateCount(@Param("msgId") String msgId, @Param("date") Date date);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.javaboy.vhr.mapper.MailSendLogMapper">
<update id="updateMailSendLogStatus">
update mail_send_log set status = #{status} where msgId=#{msgId};
</update>
<insert id="insert" parameterType="org.javaboy.vhr.model.MailSendLog">
insert into mail_send_log (msgId,empId,routeKey,exchange,tryTime,createTime) values (#{msgId},#{empId},#{routeKey},#{exchange},#{tryTime},#{createTime});
</insert>
<select id="getMailSendLogsByStatus" resultType="org.javaboy.vhr.model.MailSendLog">
select * from mail_send_log where status=0 and tryTime &lt; sysdate()
</select>
<update id="updateCount">
update mail_send_log set count=count+1,updateTime=#{date} where msgId=#{msgId};
</update>
</mapper>
\ No newline at end of file
......@@ -7,6 +7,10 @@
<facet type="web" name="Web">
<configuration>
<webroots />
<sourceRoots>
<root url="file://$MODULE_DIR$/src/main/java" />
<root url="file://$MODULE_DIR$/src/main/resources" />
</sourceRoots>
</configuration>
</facet>
</component>
......@@ -15,10 +19,13 @@
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.flywaydb:flyway-core:5.2.4" level="project" />
<orderEntry type="module" module-name="vhr-model" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.1.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-json:2.1.8.RELEASE" level="project" />
......
package org.javaboy.vhr.model;
public class MailConstants {
public static final Integer DELIVERING = 0;//消息投递中
public static final Integer SUCCESS = 1;//消息投递成功
public static final Integer FAILURE = 2;//消息投递失败
public static final Integer MAX_TRY_COUNT = 3;//最大重试次数
public static final Integer MSG_TIMEOUT = 1;//消息超时时间
public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue";
public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange";
public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key";
}
package org.javaboy.vhr.model;
import java.util.Date;
public class MailSendLog {
private String msgId;
private Integer empId;
//0 消息投递中 1 投递成功 2投递失败
private Integer status;
private String routeKey;
private String exchange;
private Integer count;
private Date tryTime;
private Date createTime;
private Date updateTime;
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public Integer getEmpId() {
return empId;
}
public void setEmpId(Integer empId) {
this.empId = empId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Date getTryTime() {
return tryTime;
}
public void setTryTime(Date tryTime) {
this.tryTime = tryTime;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
......@@ -7,6 +7,10 @@
<facet type="web" name="Web">
<configuration>
<webroots />
<sourceRoots>
<root url="file://$MODULE_DIR$/src/main/java" />
<root url="file://$MODULE_DIR$/src/main/resources" />
</sourceRoots>
</configuration>
</facet>
</component>
......@@ -15,6 +19,8 @@
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
......
package org.javaboy.vhr.config;
import org.javaboy.vhr.model.MailConstants;
import org.javaboy.vhr.service.MailSendLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public final static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Autowired
MailSendLogService mailSendLogService;
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if (ack) {
logger.info(msgId + ":消息发送成功");
mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功
} else {
logger.info(msgId + ":消息发送失败");
}
});
rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {
logger.info("消息发送失败");
});
return rabbitTemplate;
}
@Bean
Queue mailQueue() {
return new Queue(MailConstants.MAIL_QUEUE_NAME, true);
}
@Bean
DirectExchange mailExchange() {
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME, true, false);
}
@Bean
Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
......@@ -2,9 +2,12 @@ package org.javaboy.vhr.service;
import org.javaboy.vhr.mapper.EmployeeMapper;
import org.javaboy.vhr.model.Employee;
import org.javaboy.vhr.model.MailConstants;
import org.javaboy.vhr.model.MailSendLog;
import org.javaboy.vhr.model.RespPageBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -13,6 +16,7 @@ import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.UUID;
/**
* @作者 江南一点雨
......@@ -29,17 +33,19 @@ public class EmployeeService {
EmployeeMapper employeeMapper;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
MailSendLogService mailSendLogService;
public final static Logger logger = LoggerFactory.getLogger(EmployeeService.class);
SimpleDateFormat yearFormat = new SimpleDateFormat("yyyy");
SimpleDateFormat monthFormat = new SimpleDateFormat("MM");
DecimalFormat decimalFormat = new DecimalFormat("##.00");
public RespPageBean getEmployeeByPage(Integer page, Integer size, Employee employee,Date[] beginDateScope) {
public RespPageBean getEmployeeByPage(Integer page, Integer size, Employee employee, Date[] beginDateScope) {
if (page != null && size != null) {
page = (page - 1) * size;
}
List<Employee> data = employeeMapper.getEmployeeByPage(page, size, employee,beginDateScope);
Long total = employeeMapper.getTotal(employee,beginDateScope);
List<Employee> data = employeeMapper.getEmployeeByPage(page, size, employee, beginDateScope);
Long total = employeeMapper.getTotal(employee, beginDateScope);
RespPageBean bean = new RespPageBean();
bean.setData(data);
bean.setTotal(total);
......@@ -54,8 +60,17 @@ public class EmployeeService {
int result = employeeMapper.insertSelective(employee);
if (result == 1) {
Employee emp = employeeMapper.getEmployeeById(employee.getId());
logger.info(emp.toString());
rabbitTemplate.convertAndSend("javaboy.mail.welcome", emp);
//生成消息的唯一id
String msgId = UUID.randomUUID().toString();
MailSendLog mailSendLog = new MailSendLog();
mailSendLog.setMsgId(msgId);
mailSendLog.setCreateTime(new Date());
mailSendLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
mailSendLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
mailSendLog.setEmpId(emp.getId());
mailSendLog.setTryTime(new Date(System.currentTimeMillis() + 1000 * 60 * MailConstants.MSG_TIMEOUT));
mailSendLogService.insert(mailSendLog);
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(msgId));
}
return result;
}
......@@ -88,6 +103,10 @@ public class EmployeeService {
}
public Integer updateEmployeeSalaryById(Integer eid, Integer sid) {
return employeeMapper.updateEmployeeSalaryById(eid,sid);
return employeeMapper.updateEmployeeSalaryById(eid, sid);
}
public Employee getEmployeeById(Integer empId) {
return employeeMapper.getEmployeeById(empId);
}
}
package org.javaboy.vhr.service;
import org.javaboy.vhr.mapper.MailSendLogMapper;
import org.javaboy.vhr.model.MailSendLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
public class MailSendLogService {
@Autowired
MailSendLogMapper mailSendLogMapper;
public Integer updateMailSendLogStatus(String msgId, Integer status) {
return mailSendLogMapper.updateMailSendLogStatus(msgId, status);
}
public Integer insert(MailSendLog mailSendLog) {
return mailSendLogMapper.insert(mailSendLog);
}
public List<MailSendLog> getMailSendLogsByStatus() {
return mailSendLogMapper.getMailSendLogsByStatus();
}
public Integer updateCount(String msgId, Date date) {
return mailSendLogMapper.updateCount(msgId,date);
}
}
package org.javaboy.vhr.task;
import org.javaboy.vhr.model.Employee;
import org.javaboy.vhr.model.MailConstants;
import org.javaboy.vhr.model.MailSendLog;
import org.javaboy.vhr.service.EmployeeService;
import org.javaboy.vhr.service.MailSendLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
@Component
public class MailSendTask {
@Autowired
MailSendLogService mailSendLogService;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
EmployeeService employeeService;
@Scheduled(cron = "0/10 * * * * ?")
public void mailResendTask() {
List<MailSendLog> logs = mailSendLogService.getMailSendLogsByStatus();
logs.forEach(mailSendLog->{
if (mailSendLog.getCount() >= 3) {
mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败
}else{
mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date());
Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId());
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId()));
}
});
}
}
......@@ -7,6 +7,10 @@
<facet type="web" name="Web">
<configuration>
<webroots />
<sourceRoots>
<root url="file://$MODULE_DIR$/src/main/java" />
<root url="file://$MODULE_DIR$/src/main/resources" />
</sourceRoots>
</configuration>
</facet>
</component>
......@@ -15,10 +19,13 @@
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.flywaydb:flyway-core:5.2.4" level="project" />
<orderEntry type="module" module-name="vhr-mapper" />
<orderEntry type="module" module-name="vhr-model" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.1.8.RELEASE" level="project" />
......
......@@ -4,10 +4,12 @@ import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableCaching
@MapperScan(basePackages = "org.javaboy.vhr.mapper")
@EnableScheduling
public class VhrApplication {
public static void main(String[] args) {
......
......@@ -2,19 +2,18 @@ spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.url=jdbc:mysql://localhost:3306/vhr?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
server.port=8081
logging.level.org.javaboy.vhr.mapper=debug
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=192.168.91.128
spring.rabbitmq.port=5672
spring.redis.host=localhost
# confirms ص
spring.rabbitmq.publisher-confirms=true
# returnedMessage ص
spring.rabbitmq.publisher-returns=true
spring.redis.host=192.168.91.128
spring.redis.database=0
spring.redis.port=6379
spring.redis.password=123
spring.cache.cache-names=menus_cache
\ No newline at end of file
......@@ -48,7 +48,19 @@ CREATE TABLE `appraise` (
KEY `pid` (`eid`),
CONSTRAINT `appraise_ibfk_1` FOREIGN KEY (`eid`) REFERENCES `employee` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `mail_send_log`;
CREATE TABLE `mail_send_log` (
`msgId` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`empId` int(11) DEFAULT NULL,
`status` int(11) DEFAULT '0' COMMENT '0发送中,1发送成功,2发送失败',
`routeKey` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`exchange` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`count` int(11) DEFAULT NULL COMMENT '重试次数',
`tryTime` date DEFAULT NULL COMMENT '第一次重试时间',
`createTime` date DEFAULT NULL,
`updateTime` date DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*Data for the table `appraise` */
/*Table structure for table `department` */
......
......@@ -25,6 +25,7 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.flywaydb:flyway-core:5.2.4" level="project" />
<orderEntry type="module" module-name="vhr-service" />
<orderEntry type="module" module-name="vhr-mapper" />
<orderEntry type="module" module-name="vhr-model" />
......@@ -123,6 +124,8 @@
<orderEntry type="library" name="Maven: org.springframework:spring-jcl:5.1.9.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-test:5.1.9.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.xmlunit:xmlunit-core:2.6.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: javax.xml.bind:jaxb-api:2.3.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: javax.activation:javax.activation-api:1.2.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.security:spring-security-test:5.1.6.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.security:spring-security-core:5.1.6.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-aop:5.1.9.RELEASE" level="project" />
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册