diff --git a/docs/docker/create-group.sh b/docs/docker/create-group.sh new file mode 100644 index 0000000000000000000000000000000000000000..2004b327285641f44799141c2d6dc255165d8f58 --- /dev/null +++ b/docs/docker/create-group.sh @@ -0,0 +1,2 @@ +# 命令创建 消费者组 Group xfg-group 你可以更换你需要的 +docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group diff --git a/docs/docker/create-topic-consumer.sh b/docs/docker/create-topic-consumer.sh deleted file mode 100644 index 0181b3b772a26e48cbb35eb9261fa0d6ce02373f..0000000000000000000000000000000000000000 --- a/docs/docker/create-topic-consumer.sh +++ /dev/null @@ -1,3 +0,0 @@ -docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group -docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq -docker exec -it rocketmq-broker sh mqadmin updateSubGroup -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -g test_group \ No newline at end of file diff --git a/docs/docker/create-topic-producer.sh b/docs/docker/create-topic-producer.sh deleted file mode 100644 index e31c2212c07d458fee30b2f2d27965ebba51d145..0000000000000000000000000000000000000000 --- a/docs/docker/create-topic-producer.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -it rocketmq-broker sh mqadmin updateSubGroup -n rocketmq-namesrv:9876 -g test-group -t xfg-dev-tech-rocketmq \ No newline at end of file diff --git a/docs/docker/create-topic.sh b/docs/docker/create-topic.sh index 9769877bf401c19c3fc6019245f2e3aef616c0ec..63393f6ed35c63f364251af542cd528ed29a33c3 100644 --- a/docs/docker/create-topic.sh +++ b/docs/docker/create-topic.sh @@ -1 +1,2 @@ -docker exec -it rocketmq-broker sh mqadmin updateTopic -c rocketmq-broker:10911 -n rocketmq-namesrv:9876 -t xfg-dev-tech-rocketmq +# 命令创建 主题消息 Topic xfg-mq 你可以更换你需要的 +docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq diff --git a/docs/docker/docker-compose-rocketmq.yml b/docs/docker/docker-compose-rocketmq.yml deleted file mode 100644 index 0e2f956e756c5fd2927f093dddfe0bf34e0d2d8e..0000000000000000000000000000000000000000 --- a/docs/docker/docker-compose-rocketmq.yml +++ /dev/null @@ -1,28 +0,0 @@ -# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d -# 启动服务 docker-compose up -d -# 关闭服务 docker-compose down -version: '3' -services: - # RocketMQ https://hub.docker.com/r/apache/rocketmq/tags - rocketmq-namesrv: - image: apache/rocketmq:4.9.7 - container_name: rocketmq-namesrv - command: sh mqnamesrv - ports: - - "9876:9876" - rocketmq-broker: - image: apache/rocketmq:4.9.7 - container_name: rocketmq-broker - command: sh mqbroker -n rocketmq-namesrv:9876 - ports: - - "10911:10911" - - "10909:10909" - depends_on: - - rocketmq-namesrv - rocketmq-dashboard: - image: apacherocketmq/rocketmq-dashboard - container_name: rocketmq-dashboard - ports: - - "8080:8080" - environment: - - "JAVA_OPTS=-Drocketmq.config.namesrvAddr=rocketmq-namesrv:9876" \ No newline at end of file diff --git a/docs/docker/rocketmq-docker-compose.yml b/docs/docker/rocketmq-docker-compose-mac-amd-arm.yml similarity index 56% rename from docs/docker/rocketmq-docker-compose.yml rename to docs/docker/rocketmq-docker-compose-mac-amd-arm.yml index 16adb23e73f72a27fa28dedf2ac856b1f980d5fa..d1203989e55c8c2dfcf863b3523948f88bdf616f 100644 --- a/docs/docker/rocketmq-docker-compose.yml +++ b/docs/docker/rocketmq-docker-compose-mac-amd-arm.yml @@ -1,8 +1,9 @@ version: '3' services: # https://hub.docker.com/r/xuchengen/rocketmq - # broker.conf brokerIP1=127.0.0.1 - # console/config/application.properties server.port=9009 + # 注意修改项; + # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1 + # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口 rocketmq: image: livinphp/rocketmq:5.1.0 container_name: rocketmq diff --git a/docs/docker/rocketmq-docker-compose-mac-amd.yml b/docs/docker/rocketmq-docker-compose-mac-amd.yml new file mode 100644 index 0000000000000000000000000000000000000000..b8c7e667c156d228d15af3d82b5838b3f783e276 --- /dev/null +++ b/docs/docker/rocketmq-docker-compose-mac-amd.yml @@ -0,0 +1,18 @@ +version: '3' +services: + rocketmq: + image: xuchengen/rocketmq:latest + container_name: rocketmq + hostname: rocketmq + restart: always + ports: + - "7397:7397" + - "9876:9876" + - "10909:10909" + - "10911:10911" + - "10912:10912" + volumes: + - ./data:/home/app/data + - /etc/localtime:/etc/localtime + - /var/run/docker.sock:/var/run/docker.sock + network_mode: host diff --git a/docs/docker/rocketmq-docker-compose-windows.yml b/docs/docker/rocketmq-docker-compose-windows.yml new file mode 100644 index 0000000000000000000000000000000000000000..b8c7e667c156d228d15af3d82b5838b3f783e276 --- /dev/null +++ b/docs/docker/rocketmq-docker-compose-windows.yml @@ -0,0 +1,18 @@ +version: '3' +services: + rocketmq: + image: xuchengen/rocketmq:latest + container_name: rocketmq + hostname: rocketmq + restart: always + ports: + - "7397:7397" + - "9876:9876" + - "10909:10909" + - "10911:10911" + - "10912:10912" + volumes: + - ./data:/home/app/data + - /etc/localtime:/etc/localtime + - /var/run/docker.sock:/var/run/docker.sock + network_mode: host diff --git a/pom.xml b/pom.xml index b7e9ade45032b926fb24981ca21c073579d3e9df..7a5cdbb39699e28714107725f903733b2a4b9527 100644 --- a/pom.xml +++ b/pom.xml @@ -61,12 +61,12 @@ org.apache.rocketmq rocketmq-client-java - 5.0.0 + 5.0.4 org.apache.rocketmq rocketmq-spring-boot-starter - 2.2.2 + 2.2.0 diff --git a/xfg-dev-tech-app/pom.xml b/xfg-dev-tech-app/pom.xml index 72d8bcf05c3886ddb0b1992937dea27532b8c6c1..a18f0212016f619ba91d37aaa27b5f73642e3afc 100644 --- a/xfg-dev-tech-app/pom.xml +++ b/xfg-dev-tech-app/pom.xml @@ -64,12 +64,10 @@ org.apache.rocketmq rocketmq-client-java - 5.0.4 org.apache.rocketmq rocketmq-spring-boot-starter - 2.2.0 @@ -78,6 +76,11 @@ xfg-dev-tech-infrastructure 1.0-SNAPSHOT + + cn.bugstack + xfg-dev-tech-trigger + 1.0-SNAPSHOT + diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java index f6e3155d78b38a8019cc85b61cc0cf66ccdbeeaa..dcede5f9cb6bc696a4f4feeab4675263dd1c504b 100644 --- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java +++ b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/Application.java @@ -14,6 +14,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class Application { public static void main(String[] args) { + /* * 指定使用的日志框架,否则将会告警 * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap). diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java deleted file mode 100644 index 42940d686d0c7dff7004ae9436975e16c7419e8d..0000000000000000000000000000000000000000 --- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQConsumer.java +++ /dev/null @@ -1,17 +0,0 @@ -package cn.bugstack.xfg.dev.tech.mq; - -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.stereotype.Component; - -@Component -@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group") -public class RocketMQConsumer implements RocketMQListener { - - @Override - public void onMessage(String message) { - System.out.println("Received message: " + message); - // 处理消息逻辑 - } - -} diff --git a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java b/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java deleted file mode 100644 index 118cf7ba8f3d7181ee82fda708ff7b87d78c2633..0000000000000000000000000000000000000000 --- a/xfg-dev-tech-app/src/main/java/cn/bugstack/xfg/dev/tech/mq/RocketMQProducer.java +++ /dev/null @@ -1,19 +0,0 @@ -package cn.bugstack.xfg.dev.tech.mq; - -import lombok.Setter; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class RocketMQProducer { - - @Setter(onMethod_ = @Autowired) - private RocketMQTemplate rocketmqTemplate; - - public void sendMessage(String topic, String message) { - rocketmqTemplate.convertAndSend(topic, message); - } - -} - diff --git a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java similarity index 59% rename from xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java rename to xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java index dbd0581d27fd2921a91bcf40f0383abadddc3ab7..0b259d034d3030ff8e96143852e009d87ca3fffc 100644 --- a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/ApplicationTest.java +++ b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/RocketMQTest.java @@ -1,27 +1,31 @@ package cn.bugstack.xfg.dev.tech.test; -import cn.bugstack.xfg.dev.tech.mq.RocketMQProducer; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.stereotype.Component; import org.springframework.test.context.junit4.SpringRunner; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest -public class ApplicationTest { +public class RocketMQTest { - @Autowired - private RocketMQProducer rocketMQProducer; + @Setter(onMethod_ = @Autowired) + private RocketMQTemplate rocketmqTemplate; @Test public void test() throws InterruptedException { while (true) { - rocketMQProducer.sendMessage("xfg-mq", "我是测试消息"); + rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息"); Thread.sleep(3000); } } + } diff --git a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java index 4edc958512a51577b4d3868bbc4423e0c5c303b3..9b59b5774fad9f50b9ef622fae6907708233c232 100644 --- a/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java +++ b/xfg-dev-tech-app/src/test/java/cn/bugstack/xfg/dev/tech/test/domain/ISalaryAdjustApplyServiceTest.java @@ -24,10 +24,10 @@ public class ISalaryAdjustApplyServiceTest { private ISalaryAdjustApplyService salaryAdjustApplyService; @Test - public void test_execSalaryAdjust() { + public void test_execSalaryAdjust() throws InterruptedException { AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder() .employeeNumber("10000001") - .orderId("100908977676002") + .orderId("100908977676003") .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build()) .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder() .adjustTotalAmount(new BigDecimal(100)) @@ -38,6 +38,8 @@ public class ISalaryAdjustApplyServiceTest { String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate); log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId); + + Thread.sleep(Integer.MAX_VALUE); } } diff --git a/xfg-dev-tech-domain/pom.xml b/xfg-dev-tech-domain/pom.xml index 73f3d55dba671027508ccfe791c12408700e140a..10a919aba59eb19f510cf455de33c6a1298d298e 100644 --- a/xfg-dev-tech-domain/pom.xml +++ b/xfg-dev-tech-domain/pom.xml @@ -34,6 +34,14 @@ org.projectlombok lombok + + + + cn.bugstack + xfg-dev-tech-types + 1.0-SNAPSHOT + + diff --git a/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java b/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..014c39cefed9bc05a5a89baf9431003efcd59baa --- /dev/null +++ b/xfg-dev-tech-domain/src/main/java/cn/bugstack/xfg/dev/tech/domain/salary/event/SalaryAdjustEvent.java @@ -0,0 +1,29 @@ +package cn.bugstack.xfg.dev.tech.domain.salary.event; + +import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate; +import cn.bugstack.xfg.frame.types.BaseEvent; +import lombok.*; +import org.apache.commons.lang3.RandomStringUtils; + +import java.util.Date; + +/** + * @author Fuzhengwei bugstack.cn @小傅哥 + * @description 消息 + * @create 2023-07-29 10:45 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class SalaryAdjustEvent extends BaseEvent { + + public static String TOPIC = "xfg-mq"; + + public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) { + SalaryAdjustEvent event = new SalaryAdjustEvent(); + event.setId(RandomStringUtils.randomNumeric(11)); + event.setTimestamp(new Date()); + event.setData(adjustSalaryApplyOrderAggregate); + return event; + } + +} diff --git a/xfg-dev-tech-infrastructure/pom.xml b/xfg-dev-tech-infrastructure/pom.xml index 784e8a145261a0ecc69872cd2f381691fcae9099..5a4b44529ab8acd0df8b2875633baaca08c29dc1 100644 --- a/xfg-dev-tech-infrastructure/pom.xml +++ b/xfg-dev-tech-infrastructure/pom.xml @@ -22,6 +22,15 @@ org.projectlombok lombok + + + org.apache.rocketmq + rocketmq-client-java + + + org.apache.rocketmq + rocketmq-spring-boot-starter + diff --git a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java new file mode 100644 index 0000000000000000000000000000000000000000..7e1feffd2ca8b64d7b810d915608795a4882f522 --- /dev/null +++ b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/event/EventPublisher.java @@ -0,0 +1,59 @@ +package cn.bugstack.xfg.dev.tech.infrastructure.event; + +import cn.bugstack.xfg.frame.types.BaseEvent; +import com.alibaba.fastjson2.JSON; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +/** + * @author Fuzhengwei bugstack.cn @小傅哥 + * @description 事件发布,消息推送。你可以在这里扩展各类的消息推送方式,如;异步消息、延迟消息、顺序消息、事务消息。官网: + * @create 2023-07-29 09:51 + */ +@Component +@Slf4j +public class EventPublisher { + + @Setter(onMethod_ = @Autowired) + private RocketMQTemplate rocketmqTemplate; + + /** + * 普通消息 + * + * @param topic 主题 + * @param message 消息 + */ + public void publish(String topic, BaseEvent message) { + try { + String mqMessage = JSON.toJSONString(message); + log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage); + rocketmqTemplate.convertAndSend(topic, mqMessage); + } catch (Exception e) { + log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e); + // 大部分MQ发送失败后,会需要任务补偿 + } + } + + /** + * 延迟消息 + * + * @param topic 主题 + * @param message 消息 + * @param delayTimeLevel 延迟时长 + */ + public void publishDelivery(String topic, BaseEvent message, int delayTimeLevel) { + try { + String mqMessage = JSON.toJSONString(message); + log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage); + rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel); + } catch (Exception e) { + log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e); + // 大部分MQ发送失败后,会需要任务补偿 + } + } + +} diff --git a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java index 0befee71e522ceb465e47be9fa47550c375cb3db..774ec4d2ade48c4cd2c06514e74d2f75e7807ab8 100644 --- a/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java +++ b/xfg-dev-tech-infrastructure/src/main/java/cn/bugstack/xfg/dev/tech/infrastructure/repository/SalaryAdjustRepository.java @@ -1,5 +1,6 @@ package cn.bugstack.xfg.dev.tech.infrastructure.repository; +import cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent; import cn.bugstack.xfg.dev.tech.domain.salary.model.aggreate.AdjustSalaryApplyOrderAggregate; import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeEntity; import cn.bugstack.xfg.dev.tech.domain.salary.model.entity.EmployeeSalaryAdjustEntity; @@ -7,6 +8,7 @@ import cn.bugstack.xfg.dev.tech.domain.salary.repository.ISalaryAdjustRepository import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeDAO; import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryAdjustDAO; import cn.bugstack.xfg.dev.tech.infrastructure.dao.IEmployeeSalaryDAO; +import cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher; import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeePO; import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryAdjustPO; import cn.bugstack.xfg.dev.tech.infrastructure.po.EmployeeSalaryPO; @@ -26,6 +28,8 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository { private IEmployeeSalaryDAO employeeSalaryDAO; @Resource private IEmployeeSalaryAdjustDAO employeeSalaryAdjustDAO; + @Resource + private EventPublisher eventPublisher; /** * Spring Boot 事务管理的级别可以通过 `@Transactional` 注解的 `isolation` 属性进行配置。常见的事务隔离级别有以下几种: @@ -85,6 +89,13 @@ public class SalaryAdjustRepository implements ISalaryAdjustRepository { // 写入流水 employeeSalaryAdjustDAO.insert(employeeSalaryAdjustPO); + /* + * 发送消息,实际应用常见建议 + * 1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。 + * 2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。 + */ + eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate)); + return orderId; } diff --git a/xfg-dev-tech-trigger/pom.xml b/xfg-dev-tech-trigger/pom.xml index ca22945045b6a183c5fddbd5d169c39a2a394d3b..542f4cf15a31ee058176871d38bfe6a8e73f883f 100644 --- a/xfg-dev-tech-trigger/pom.xml +++ b/xfg-dev-tech-trigger/pom.xml @@ -30,6 +30,15 @@ org.apache.commons commons-lang3 + + + org.apache.rocketmq + rocketmq-client-java + + + org.apache.rocketmq + rocketmq-spring-boot-starter + diff --git a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java index 45eb2c802b38d33d829cabf8a31b9767cd4539ec..9a89e1b7f75a422c749839cfde381a82739b683d 100644 --- a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java +++ b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/http/Controller.java @@ -14,26 +14,11 @@ import java.util.concurrent.ThreadPoolExecutor; @RestController public class Controller { - @Resource - private ThreadPoolExecutor threadPoolExecutor; - /** * http://localhost:8090/success */ @RequestMapping("/success") public Response success() { - log.info("测试调用"); - try { - // 随机休眠 - Thread.sleep(new Random().nextInt(1000)); - // 开启线程 - threadPoolExecutor.execute(() -> { - log.info("开启线程"); - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (new Random().nextInt(100) == 1) throw new RuntimeException("异常"); return Response.builder() .code(Constants.ResponseCode.SUCCESS.getCode()) .info(Constants.ResponseCode.SUCCESS.getInfo()) diff --git a/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java new file mode 100644 index 0000000000000000000000000000000000000000..25417e3b904a6aedbf3b9b9640e054a5f8402aee --- /dev/null +++ b/xfg-dev-tech-trigger/src/main/java/cn/bugstack/xfg/dev/tech/trigger/mq/SalaryAdjustMQListener.java @@ -0,0 +1,23 @@ +package cn.bugstack.xfg.dev.tech.trigger.mq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author Fuzhengwei bugstack.cn @小傅哥 + * @description 接收消息 + * @create 2023-07-29 11:22 + */ +@Component +@Slf4j +@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group") +public class SalaryAdjustMQListener implements RocketMQListener { + + @Override + public void onMessage(String s) { + log.info("接收到MQ消息 {}", s); + } + +} diff --git a/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java b/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..45061cad049d35dd86b48126235a76488e1a5c00 --- /dev/null +++ b/xfg-dev-tech-types/src/main/java/cn/bugstack/xfg/frame/types/BaseEvent.java @@ -0,0 +1,19 @@ +package cn.bugstack.xfg.frame.types; + +import lombok.Data; + +import java.util.Date; + +/** + * @author Fuzhengwei bugstack.cn @小傅哥 + * @description + * @create 2023-07-29 10:47 + */ +@Data +public class BaseEvent { + + private String id; + private Date timestamp; + private T data; + +}