提交 6ecd501a 编写于 作者: 小傅哥's avatar 小傅哥

小傅哥,feat:Rocket MQ 配置和使用

上级 a6be6166
# 命令创建 消费者组 Group xfg-group 你可以更换你需要的
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group
docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group
docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq
docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group
\ No newline at end of file
docker exec -it rocketmq-broker sh mqadmin updateSubGroup -n rocketmq-namesrv:9876 -g test-group -t xfg-dev-tech-rocketmq
\ No newline at end of file
docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq
# 命令创建 主题消息 Topic xfg-mq 你可以更换你需要的
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq
# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d
# 启动服务 docker-compose up -d
# 关闭服务 docker-compose down
version: '3'
services:
# RocketMQ https://hub.docker.com/r/apache/rocketmq/tags
rocketmq-namesrv:
image: apache/rocketmq:4.9.7
container_name: rocketmq-namesrv
command: sh mqnamesrv
ports:
- "9876:9876"
rocketmq-broker:
image: apache/rocketmq:4.9.7
container_name: rocketmq-broker
command: sh mqbroker -n rocketmq-namesrv:9876
ports:
- "10911:10911"
- "10909:10909"
depends_on:
- rocketmq-namesrv
rocketmq-dashboard:
image: apacherocketmq/rocketmq-dashboard
container_name: rocketmq-dashboard
ports:
- "8080:8080"
environment:
- "JAVA_OPTS=-Drocketmq.config.namesrvAddr=rocketmq-namesrv:9876"
\ No newline at end of file
version: '3'
services:
# https://hub.docker.com/r/xuchengen/rocketmq
# broker.conf brokerIP1=127.0.0.1
# console/config/application.properties server.port=9009
# 注意修改项;
# 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
# 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
rocketmq:
image: livinphp/rocketmq:5.1.0
container_name: rocketmq
......
version: '3'
services:
rocketmq:
image: xuchengen/rocketmq:latest
container_name: rocketmq
hostname: rocketmq
restart: always
ports:
- "7397:7397"
- "9876:9876"
- "10909:10909"
- "10911:10911"
- "10912:10912"
volumes:
- ./data:/home/app/data
- /etc/localtime:/etc/localtime
- /var/run/docker.sock:/var/run/docker.sock
network_mode: host
version: '3'
services:
rocketmq:
image: xuchengen/rocketmq:latest
container_name: rocketmq
hostname: rocketmq
restart: always
ports:
- "7397:7397"
- "9876:9876"
- "10909:10909"
- "10911:10911"
- "10912:10912"
volumes:
- ./data:/home/app/data
- /etc/localtime:/etc/localtime
- /var/run/docker.sock:/var/run/docker.sock
network_mode: host
......@@ -61,12 +61,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.0</version>
<version>5.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<version>2.2.0</version>
</dependency>
</dependencies>
</dependencyManagement>
......
......@@ -64,12 +64,10 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 自身模块 begin -->
......@@ -78,6 +76,11 @@
<artifactId>xfg-dev-tech-infrastructure</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-trigger</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 自身模块 end -->
</dependencies>
......
......@@ -14,6 +14,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class Application {
public static void main(String[] args) {
/*
* 指定使用的日志框架,否则将会告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
......
package cn.bugstack.xfg.dev.tech.mq;
import lombok.Setter;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
public void sendMessage(String topic, String message) {
rocketmqTemplate.convertAndSend(topic, message);
}
}
package cn.bugstack.xfg.dev.tech.test;
import cn.bugstack.xfg.dev.tech.mq.RocketMQProducer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringRunner;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
public class RocketMQTest {
@Autowired
private RocketMQProducer rocketMQProducer;
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
@Test
public void test() throws InterruptedException {
while (true) {
rocketMQProducer.sendMessage("xfg-mq", "我是测试消息");
rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息");
Thread.sleep(3000);
}
}
}
......@@ -24,10 +24,10 @@ public class ISalaryAdjustApplyServiceTest {
private ISalaryAdjustApplyService salaryAdjustApplyService;
@Test
public void test_execSalaryAdjust() {
public void test_execSalaryAdjust() throws InterruptedException {
AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
.employeeNumber("10000001")
.orderId("100908977676002")
.orderId("100908977676003")
.employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
.employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
.adjustTotalAmount(new BigDecimal(100))
......@@ -38,6 +38,8 @@ public class ISalaryAdjustApplyServiceTest {
String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
Thread.sleep(Integer.MAX_VALUE);
}
}
......@@ -34,6 +34,14 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 自身模块 begin -->
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-types</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 自身模块 End -->
</dependencies>
<build>
......
package cn.bugstack.xfg.dev.tech.domain.salary.event;
import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate;
import cn.bugstack.xfg.frame.types.BaseEvent;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import java.util.Date;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description 消息
* @create 2023-07-29 10:45
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {
public static String TOPIC = "xfg-mq";
public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
SalaryAdjustEvent event = new SalaryAdjustEvent();
event.setId(RandomStringUtils.randomNumeric(11));
event.setTimestamp(new Date());
event.setData(adjustSalaryApplyOrderAggregate);
return event;
}
}
......@@ -22,6 +22,15 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<!-- 自身模块 begin -->
<dependency>
......
package cn.bugstack.xfg.dev.tech.infrastructure.event;
import cn.bugstack.xfg.frame.types.BaseEvent;
import com.alibaba.fastjson2.JSON;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description 事件发布,消息推送。你可以在这里扩展各类的消息推送方式,如;异步消息、延迟消息、顺序消息、事务消息。官网:
* @create 2023-07-29 09:51
*/
@Component
@Slf4j
public class EventPublisher {
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
/**
* 普通消息
*
* @param topic 主题
* @param message 消息
*/
public void publish(String topic, BaseEvent<?> message) {
try {
String mqMessage = JSON.toJSONString(message);
log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage);
rocketmqTemplate.convertAndSend(topic, mqMessage);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
// 大部分MQ发送失败后,会需要任务补偿
}
}
/**
* 延迟消息
*
* @param topic 主题
* @param message 消息
* @param delayTimeLevel 延迟时长
*/
public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
try {
String mqMessage = JSON.toJSONString(message);
log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage);
rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
} catch (Exception e) {
log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
// 大部分MQ发送失败后,会需要任务补偿
}
}
}
package cn.bugstack.xfg.dev.tech.infrastructure.repository;
import cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent;
import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate;
import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeEntity;
import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeSalaryAdjustEntity;
......@@ -7,6 +8,7 @@ import cn.bugstack.xfg.dev.tech.domain.salary.repository.ISalaryAdjustRepository
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeDAO;
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryAdjustDAO;
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryDAO;
import cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeePO;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryAdjustPO;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryPO;
......@@ -26,6 +28,8 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository {
private IEmployeeSalaryDAO employeeSalaryDAO;
@Resource
private IEmployeeSalaryAdjustDAO employeeSalaryAdjustDAO;
@Resource
private EventPublisher eventPublisher;
/**
* Spring Boot 事务管理的级别可以通过 `@Transactional` 注解的 `isolation` 属性进行配置。常见的事务隔离级别有以下几种:
......@@ -85,6 +89,13 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository {
// 写入流水
employeeSalaryAdjustDAO.insert(employeeSalaryAdjustPO);
/*
* 发送消息,实际应用常见建议
* 1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。
* 2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。
*/
eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
return orderId;
}
......
......@@ -30,6 +30,15 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<!-- 自身模块 begin -->
<dependency>
......
......@@ -14,26 +14,11 @@ import java.util.concurrent.ThreadPoolExecutor;
@RestController
public class Controller {
@Resource
private ThreadPoolExecutor threadPoolExecutor;
/**
* http://localhost:8090/success
*/
@RequestMapping("/success")
public Response<String> success() {
log.info("测试调用");
try {
// 随机休眠
Thread.sleep(new Random().nextInt(1000));
// 开启线程
threadPoolExecutor.execute(() -> {
log.info("开启线程");
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (new Random().nextInt(100) == 1) throw new RuntimeException("异常");
return Response.<String>builder()
.code(Constants.ResponseCode.SUCCESS.getCode())
.info(Constants.ResponseCode.SUCCESS.getInfo())
......
package cn.bugstack.xfg.dev.tech.mq;
package cn.bugstack.xfg.dev.tech.trigger.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description 接收消息
* @create 2023-07-29 11:22
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class RocketMQConsumer implements RocketMQListener<String> {
public class SalaryAdjustMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
// 处理消息逻辑
public void onMessage(String s) {
log.info("接收到MQ消息 {}", s);
}
}
package cn.bugstack.xfg.frame.types;
import lombok.Data;
import java.util.Date;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description
* @create 2023-07-29 10:47
*/
@Data
public class BaseEvent<T> {
private String id;
private Date timestamp;
private T data;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册