From 6ecd501afc1491319c57abcad599f4bed085765b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=B0=8F=E5=82=85=E5=93=A5?= <184172133@qq.com>
Date: Sat, 29 Jul 2023 13:35:41 +0800
Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E5=82=85=E5=93=A5=EF=BC=8Cfeat?=
=?UTF-8?q?=EF=BC=9ARocket=20MQ=20=E9=85=8D=E7=BD=AE=E5=92=8C=E4=BD=BF?=
=?UTF-8?q?=E7=94=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
docs/docker/create-group.sh | 2 +
docs/docker/create-topic-consumer.sh | 3 -
docs/docker/create-topic-producer.sh | 1 -
docs/docker/create-topic.sh | 3 +-
docs/docker/docker-compose-rocketmq.yml | 28 ---------
...> rocketmq-docker-compose-mac-amd-arm.yml} | 5 +-
.../rocketmq-docker-compose-mac-amd.yml | 18 ++++++
.../rocketmq-docker-compose-windows.yml | 18 ++++++
pom.xml | 4 +-
xfg-dev-tech-app/pom.xml | 7 ++-
.../cn/bugstack/xfg/dev/tech/Application.java | 1 +
.../xfg/dev/tech/mq/RocketMQConsumer.java | 17 ------
.../xfg/dev/tech/mq/RocketMQProducer.java | 19 ------
...ApplicationTest.java => RocketMQTest.java} | 14 +++--
.../domain/ISalaryAdjustApplyServiceTest.java | 6 +-
xfg-dev-tech-domain/pom.xml | 8 +++
.../salary/event/SalaryAdjustEvent.java | 29 +++++++++
xfg-dev-tech-infrastructure/pom.xml | 9 +++
.../infrastructure/event/EventPublisher.java | 59 +++++++++++++++++++
.../repository/SalaryAdjustRepository.java | 11 ++++
xfg-dev-tech-trigger/pom.xml | 9 +++
.../xfg/dev/tech/trigger/http/Controller.java | 15 -----
.../trigger/mq/SalaryAdjustMQListener.java | 23 ++++++++
.../bugstack/xfg/frame/types/BaseEvent.java | 19 ++++++
24 files changed, 231 insertions(+), 97 deletions(-)
create mode 100644 docs/docker/create-group.sh
delete mode 100644 docs/docker/create-topic-consumer.sh
delete mode 100644 docs/docker/create-topic-producer.sh
delete mode 100644 docs/docker/docker-compose-rocketmq.yml
rename docs/docker/{rocketmq-docker-compose.yml => rocketmq-docker-compose-mac-amd-arm.yml} (56%)
create mode 100644 docs/docker/rocketmq-docker-compose-mac-amd.yml
create mode 100644 docs/docker/rocketmq-docker-compose-windows.yml
delete mode 100644 xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java
delete mode 100644 xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java
rename xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/{ApplicationTest.java => RocketMQTest.java} (59%)
create mode 100644 xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java
create mode 100644 xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java
create mode 100644 xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java
create mode 100644 xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java
diff --git a/docs/docker/create-group.sh b/docs/docker/create-group.sh
new file mode 100644
index 0000000..2004b32
--- /dev/null
+++ b/docs/docker/create-group.sh
@@ -0,0 +1,2 @@
+# 命令创建 消费者组 Group xfg-group 你可以更换你需要的
+docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group
diff --git a/docs/docker/create-topic-consumer.sh b/docs/docker/create-topic-consumer.sh
deleted file mode 100644
index 0181b3b..0000000
--- a/docs/docker/create-topic-consumer.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group
-docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq
-docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group
\ No newline at end of file
diff --git a/docs/docker/create-topic-producer.sh b/docs/docker/create-topic-producer.sh
deleted file mode 100644
index e31c221..0000000
--- a/docs/docker/create-topic-producer.sh
+++ /dev/null
@@ -1 +0,0 @@
-docker exec -it rocketmq-broker sh mqadmin updateSubGroup -n rocketmq-namesrv:9876 -g test-group -t xfg-dev-tech-rocketmq
\ No newline at end of file
diff --git a/docs/docker/create-topic.sh b/docs/docker/create-topic.sh
index 9769877..63393f6 100644
--- a/docs/docker/create-topic.sh
+++ b/docs/docker/create-topic.sh
@@ -1 +1,2 @@
-docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq
+# 命令创建 主题消息 Topic xfg-mq 你可以更换你需要的
+docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq
diff --git a/docs/docker/docker-compose-rocketmq.yml b/docs/docker/docker-compose-rocketmq.yml
deleted file mode 100644
index 0e2f956..0000000
--- a/docs/docker/docker-compose-rocketmq.yml
+++ /dev/null
@@ -1,28 +0,0 @@
-# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d
-# 启动服务 docker-compose up -d
-# 关闭服务 docker-compose down
-version: '3'
-services:
- # RocketMQ https://hub.docker.com/r/apache/rocketmq/tags
- rocketmq-namesrv:
- image: apache/rocketmq:4.9.7
- container_name: rocketmq-namesrv
- command: sh mqnamesrv
- ports:
- - "9876:9876"
- rocketmq-broker:
- image: apache/rocketmq:4.9.7
- container_name: rocketmq-broker
- command: sh mqbroker -n rocketmq-namesrv:9876
- ports:
- - "10911:10911"
- - "10909:10909"
- depends_on:
- - rocketmq-namesrv
- rocketmq-dashboard:
- image: apacherocketmq/rocketmq-dashboard
- container_name: rocketmq-dashboard
- ports:
- - "8080:8080"
- environment:
- - "JAVA_OPTS=-Drocketmq.config.namesrvAddr=rocketmq-namesrv:9876"
\ No newline at end of file
diff --git a/docs/docker/rocketmq-docker-compose.yml b/docs/docker/rocketmq-docker-compose-mac-amd-arm.yml
similarity index 56%
rename from docs/docker/rocketmq-docker-compose.yml
rename to docs/docker/rocketmq-docker-compose-mac-amd-arm.yml
index 16adb23..d120398 100644
--- a/docs/docker/rocketmq-docker-compose.yml
+++ b/docs/docker/rocketmq-docker-compose-mac-amd-arm.yml
@@ -1,8 +1,9 @@
version: '3'
services:
# https://hub.docker.com/r/xuchengen/rocketmq
- # broker.conf brokerIP1=127.0.0.1
- # console/config/application.properties server.port=9009
+ # 注意修改项;
+ # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
+ # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
rocketmq:
image: livinphp/rocketmq:5.1.0
container_name: rocketmq
diff --git a/docs/docker/rocketmq-docker-compose-mac-amd.yml b/docs/docker/rocketmq-docker-compose-mac-amd.yml
new file mode 100644
index 0000000..b8c7e66
--- /dev/null
+++ b/docs/docker/rocketmq-docker-compose-mac-amd.yml
@@ -0,0 +1,18 @@
+version: '3'
+services:
+ rocketmq:
+ image: xuchengen/rocketmq:latest
+ container_name: rocketmq
+ hostname: rocketmq
+ restart: always
+ ports:
+ - "7397:7397"
+ - "9876:9876"
+ - "10909:10909"
+ - "10911:10911"
+ - "10912:10912"
+ volumes:
+ - ./data:/home/app/data
+ - /etc/localtime:/etc/localtime
+ - /var/run/docker.sock:/var/run/docker.sock
+ network_mode: host
diff --git a/docs/docker/rocketmq-docker-compose-windows.yml b/docs/docker/rocketmq-docker-compose-windows.yml
new file mode 100644
index 0000000..b8c7e66
--- /dev/null
+++ b/docs/docker/rocketmq-docker-compose-windows.yml
@@ -0,0 +1,18 @@
+version: '3'
+services:
+ rocketmq:
+ image: xuchengen/rocketmq:latest
+ container_name: rocketmq
+ hostname: rocketmq
+ restart: always
+ ports:
+ - "7397:7397"
+ - "9876:9876"
+ - "10909:10909"
+ - "10911:10911"
+ - "10912:10912"
+ volumes:
+ - ./data:/home/app/data
+ - /etc/localtime:/etc/localtime
+ - /var/run/docker.sock:/var/run/docker.sock
+ network_mode: host
diff --git a/pom.xml b/pom.xml
index b7e9ade..7a5cdbb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,12 +61,12 @@
org.apache.rocketmq
rocketmq-client-java
- 5.0.0
+ 5.0.4
org.apache.rocketmq
rocketmq-spring-boot-starter
- 2.2.2
+ 2.2.0
diff --git a/xfg-dev-tech-app/pom.xml b/xfg-dev-tech-app/pom.xml
index 72d8bcf..a18f021 100644
--- a/xfg-dev-tech-app/pom.xml
+++ b/xfg-dev-tech-app/pom.xml
@@ -64,12 +64,10 @@
org.apache.rocketmq
rocketmq-client-java
- 5.0.4
org.apache.rocketmq
rocketmq-spring-boot-starter
- 2.2.0
@@ -78,6 +76,11 @@
xfg-dev-tech-infrastructure
1.0-SNAPSHOT
+
+ cn.bugstack
+ xfg-dev-tech-trigger
+ 1.0-SNAPSHOT
+
diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java
index f6e3155..dcede5f 100644
--- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java
+++ b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java
@@ -14,6 +14,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class Application {
public static void main(String[] args) {
+
/*
* 指定使用的日志框架,否则将会告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java
deleted file mode 100644
index 42940d6..0000000
--- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package cn.bugstack.xfg.dev.tech.mq;
-
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.springframework.stereotype.Component;
-
-@Component
-@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
-public class RocketMQConsumer implements RocketMQListener {
-
- @Override
- public void onMessage(String message) {
- System.out.println("Received message: " + message);
- // 处理消息逻辑
- }
-
-}
diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java
deleted file mode 100644
index 118cf7b..0000000
--- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package cn.bugstack.xfg.dev.tech.mq;
-
-import lombok.Setter;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class RocketMQProducer {
-
- @Setter(onMethod_ = @Autowired)
- private RocketMQTemplate rocketmqTemplate;
-
- public void sendMessage(String topic, String message) {
- rocketmqTemplate.convertAndSend(topic, message);
- }
-
-}
-
diff --git a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java
similarity index 59%
rename from xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java
rename to xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java
index dbd0581..0b259d0 100644
--- a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java
+++ b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java
@@ -1,27 +1,31 @@
package cn.bugstack.xfg.dev.tech.test;
-import cn.bugstack.xfg.dev.tech.mq.RocketMQProducer;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringRunner;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
-public class ApplicationTest {
+public class RocketMQTest {
- @Autowired
- private RocketMQProducer rocketMQProducer;
+ @Setter(onMethod_ = @Autowired)
+ private RocketMQTemplate rocketmqTemplate;
@Test
public void test() throws InterruptedException {
while (true) {
- rocketMQProducer.sendMessage("xfg-mq", "我是测试消息");
+ rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息");
Thread.sleep(3000);
}
}
+
}
diff --git a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java
index 4edc958..9b59b57 100644
--- a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java
+++ b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java
@@ -24,10 +24,10 @@ public class ISalaryAdjustApplyServiceTest {
private ISalaryAdjustApplyService salaryAdjustApplyService;
@Test
- public void test_execSalaryAdjust() {
+ public void test_execSalaryAdjust() throws InterruptedException {
AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
.employeeNumber("10000001")
- .orderId("100908977676002")
+ .orderId("100908977676003")
.employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
.employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
.adjustTotalAmount(new BigDecimal(100))
@@ -38,6 +38,8 @@ public class ISalaryAdjustApplyServiceTest {
String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
+
+ Thread.sleep(Integer.MAX_VALUE);
}
}
diff --git a/xfg-dev-tech-domain/pom.xml b/xfg-dev-tech-domain/pom.xml
index 73f3d55..10a919a 100644
--- a/xfg-dev-tech-domain/pom.xml
+++ b/xfg-dev-tech-domain/pom.xml
@@ -34,6 +34,14 @@
org.projectlombok
lombok
+
+
+
+ cn.bugstack
+ xfg-dev-tech-types
+ 1.0-SNAPSHOT
+
+
diff --git a/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java b/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java
new file mode 100644
index 0000000..014c39c
--- /dev/null
+++ b/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java
@@ -0,0 +1,29 @@
+package cn.bugstack.xfg.dev.tech.domain.salary.event;
+
+import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate;
+import cn.bugstack.xfg.frame.types.BaseEvent;
+import lombok.*;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.Date;
+
+/**
+ * @author Fuzhengwei bugstack.cn @小傅哥
+ * @description 消息
+ * @create 2023-07-29 10:45
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class SalaryAdjustEvent extends BaseEvent {
+
+ public static String TOPIC = "xfg-mq";
+
+ public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
+ SalaryAdjustEvent event = new SalaryAdjustEvent();
+ event.setId(RandomStringUtils.randomNumeric(11));
+ event.setTimestamp(new Date());
+ event.setData(adjustSalaryApplyOrderAggregate);
+ return event;
+ }
+
+}
diff --git a/xfg-dev-tech-infrastructure/pom.xml b/xfg-dev-tech-infrastructure/pom.xml
index 784e8a1..5a4b445 100644
--- a/xfg-dev-tech-infrastructure/pom.xml
+++ b/xfg-dev-tech-infrastructure/pom.xml
@@ -22,6 +22,15 @@
org.projectlombok
lombok
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
diff --git a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java
new file mode 100644
index 0000000..7e1feff
--- /dev/null
+++ b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java
@@ -0,0 +1,59 @@
+package cn.bugstack.xfg.dev.tech.infrastructure.event;
+
+import cn.bugstack.xfg.frame.types.BaseEvent;
+import com.alibaba.fastjson2.JSON;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Fuzhengwei bugstack.cn @小傅哥
+ * @description 事件发布,消息推送。你可以在这里扩展各类的消息推送方式,如;异步消息、延迟消息、顺序消息、事务消息。官网:
+ * @create 2023-07-29 09:51
+ */
+@Component
+@Slf4j
+public class EventPublisher {
+
+ @Setter(onMethod_ = @Autowired)
+ private RocketMQTemplate rocketmqTemplate;
+
+ /**
+ * 普通消息
+ *
+ * @param topic 主题
+ * @param message 消息
+ */
+ public void publish(String topic, BaseEvent> message) {
+ try {
+ String mqMessage = JSON.toJSONString(message);
+ log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage);
+ rocketmqTemplate.convertAndSend(topic, mqMessage);
+ } catch (Exception e) {
+ log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
+ // 大部分MQ发送失败后,会需要任务补偿
+ }
+ }
+
+ /**
+ * 延迟消息
+ *
+ * @param topic 主题
+ * @param message 消息
+ * @param delayTimeLevel 延迟时长
+ */
+ public void publishDelivery(String topic, BaseEvent> message, int delayTimeLevel) {
+ try {
+ String mqMessage = JSON.toJSONString(message);
+ log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage);
+ rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
+ } catch (Exception e) {
+ log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
+ // 大部分MQ发送失败后,会需要任务补偿
+ }
+ }
+
+}
diff --git a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java
index 0befee7..774ec4d 100644
--- a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java
+++ b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java
@@ -1,5 +1,6 @@
package cn.bugstack.xfg.dev.tech.infrastructure.repository;
+import cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent;
import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate;
import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeEntity;
import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeSalaryAdjustEntity;
@@ -7,6 +8,7 @@ import cn.bugstack.xfg.dev.tech.domain.salary.repository.ISalaryAdjustRepository
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeDAO;
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryAdjustDAO;
import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryDAO;
+import cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeePO;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryAdjustPO;
import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryPO;
@@ -26,6 +28,8 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository {
private IEmployeeSalaryDAO employeeSalaryDAO;
@Resource
private IEmployeeSalaryAdjustDAO employeeSalaryAdjustDAO;
+ @Resource
+ private EventPublisher eventPublisher;
/**
* Spring Boot 事务管理的级别可以通过 `@Transactional` 注解的 `isolation` 属性进行配置。常见的事务隔离级别有以下几种:
@@ -85,6 +89,13 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository {
// 写入流水
employeeSalaryAdjustDAO.insert(employeeSalaryAdjustPO);
+ /*
+ * 发送消息,实际应用常见建议
+ * 1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。
+ * 2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。
+ */
+ eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
+
return orderId;
}
diff --git a/xfg-dev-tech-trigger/pom.xml b/xfg-dev-tech-trigger/pom.xml
index ca22945..542f4cf 100644
--- a/xfg-dev-tech-trigger/pom.xml
+++ b/xfg-dev-tech-trigger/pom.xml
@@ -30,6 +30,15 @@
org.apache.commons
commons-lang3
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
diff --git a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java
index 45eb2c8..9a89e1b 100644
--- a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java
+++ b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java
@@ -14,26 +14,11 @@ import java.util.concurrent.ThreadPoolExecutor;
@RestController
public class Controller {
- @Resource
- private ThreadPoolExecutor threadPoolExecutor;
-
/**
* http://localhost:8090/success
*/
@RequestMapping("/success")
public Response success() {
- log.info("测试调用");
- try {
- // 随机休眠
- Thread.sleep(new Random().nextInt(1000));
- // 开启线程
- threadPoolExecutor.execute(() -> {
- log.info("开启线程");
- });
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- if (new Random().nextInt(100) == 1) throw new RuntimeException("异常");
return Response.builder()
.code(Constants.ResponseCode.SUCCESS.getCode())
.info(Constants.ResponseCode.SUCCESS.getInfo())
diff --git a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java
new file mode 100644
index 0000000..25417e3
--- /dev/null
+++ b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java
@@ -0,0 +1,23 @@
+package cn.bugstack.xfg.dev.tech.trigger.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Fuzhengwei bugstack.cn @小傅哥
+ * @description 接收消息
+ * @create 2023-07-29 11:22
+ */
+@Component
+@Slf4j
+@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
+public class SalaryAdjustMQListener implements RocketMQListener {
+
+ @Override
+ public void onMessage(String s) {
+ log.info("接收到MQ消息 {}", s);
+ }
+
+}
diff --git a/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java b/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java
new file mode 100644
index 0000000..45061ca
--- /dev/null
+++ b/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java
@@ -0,0 +1,19 @@
+package cn.bugstack.xfg.frame.types;
+
+import lombok.Data;
+
+import java.util.Date;
+
+/**
+ * @author Fuzhengwei bugstack.cn @小傅哥
+ * @description
+ * @create 2023-07-29 10:47
+ */
+@Data
+public class BaseEvent {
+
+ private String id;
+ private Date timestamp;
+ private T data;
+
+}
--
GitLab