From 5db822e71528db36aa4dfb4ab0dd82fed50776a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E5=82=85=E5=93=A5?= <184172133@qq.com> Date: Sat, 4 Nov 2023 15:08:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E5=82=85=E5=93=A5=EF=BC=8Cfeat?= =?UTF-8?q?=EF=BC=9Arocketmq=20=E9=85=8D=E7=BD=AE=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/dev-ops/environment/docker-compose.yml | 20 +++++++ .../environment/rocketmq/create-group.sh | 2 + .../environment/rocketmq/create-topic.sh | 2 + .../console/config/application.properties | 54 +++++++++++++++++++ .../rocketmq/data/console/config/config | 1 + .../rocketmq/data/console/config/logback.xml | 49 +++++++++++++++++ .../data/console/store/users.properties | 25 +++++++++ .../environment/rocketmq/data/logs/logs.txt | 0 .../rocketmq/data/rocketmq/conf/broker.conf | 33 ++++++++++++ pom.xml | 10 ++++ xfg-frame-archetype-std-app/pom.xml | 8 +++ .../src/main/resources/application-dev.yml | 23 ++++++++ .../test/infrastructure/redis/RedisTest.java | 2 +- xfg-frame-archetype-std-trigger/pom.xml | 9 +++- .../java/cn/bugstack/trigger/job/XXLJob.java | 10 +++- .../bugstack/trigger/listener/MQListener.java | 22 ++++++++ 16 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 docs/dev-ops/environment/rocketmq/create-group.sh create mode 100644 docs/dev-ops/environment/rocketmq/create-topic.sh create mode 100644 docs/dev-ops/environment/rocketmq/data/console/config/application.properties create mode 120000 docs/dev-ops/environment/rocketmq/data/console/config/config create mode 100644 docs/dev-ops/environment/rocketmq/data/console/config/logback.xml create mode 100644 docs/dev-ops/environment/rocketmq/data/console/store/users.properties create mode 100644 docs/dev-ops/environment/rocketmq/data/logs/logs.txt create mode 100644 docs/dev-ops/environment/rocketmq/data/rocketmq/conf/broker.conf create mode 100644 xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/listener/MQListener.java diff --git a/docs/dev-ops/environment/docker-compose.yml b/docs/dev-ops/environment/docker-compose.yml index 2dd8372..8713f33 100644 --- a/docs/dev-ops/environment/docker-compose.yml +++ b/docs/dev-ops/environment/docker-compose.yml @@ -86,6 +86,7 @@ services: container_name: mysql-job-dbdata volumes: - /var/lib/mysql + # Redis redis: image: redis:7.2.0 @@ -98,6 +99,25 @@ services: networks: - my-network + # https://hub.docker.com/r/xuchengen/rocketmq + # 注意修改项; + # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1 + # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口 + rocketmq: + image: livinphp/rocketmq:5.1.0 + container_name: rocketmq + ports: + - 8080:8080 + - 9876:9876 + - 10909:10909 + - 10911:10911 + - 10912:10912 + volumes: + - ./rocketmq/data:/home/app/data + environment: + TZ: "Asia/Shanghai" + NAMESRV_ADDR: "rocketmq:9876" + networks: my-network: driver: bridge \ No newline at end of file diff --git a/docs/dev-ops/environment/rocketmq/create-group.sh b/docs/dev-ops/environment/rocketmq/create-group.sh new file mode 100644 index 0000000..2004b32 --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/create-group.sh @@ -0,0 +1,2 @@ +# 命令创建 消费者组 Group xfg-group 你可以更换你需要的 +docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group diff --git a/docs/dev-ops/environment/rocketmq/create-topic.sh b/docs/dev-ops/environment/rocketmq/create-topic.sh new file mode 100644 index 0000000..63393f6 --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/create-topic.sh @@ -0,0 +1,2 @@ +# 命令创建 主题消息 Topic xfg-mq 你可以更换你需要的 +docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq diff --git a/docs/dev-ops/environment/rocketmq/data/console/config/application.properties b/docs/dev-ops/environment/rocketmq/data/console/config/application.properties new file mode 100644 index 0000000..00936b3 --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/data/console/config/application.properties @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server.address=0.0.0.0 +server.port=8080 + +### SSL setting +#server.ssl.key-store=classpath:rmqcngkeystore.jks +#server.ssl.key-store-password=rocketmq +#server.ssl.keyStoreType=PKCS12 +#server.ssl.keyAlias=rmqcngkey + +#spring.application.index=true +spring.application.name=rocketmq-dashboard +spring.http.encoding.charset=UTF-8 +spring.http.encoding.enabled=true +spring.http.encoding.force=true +logging.level.root=INFO +logging.config=./config/logback.xml +#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 +rocketmq.config.namesrvAddr= +#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true +rocketmq.config.isVIPChannel= +#timeout for mqadminExt, default 5000ms +rocketmq.config.timeoutMillis= +#rocketmq-console's data path:dashboard/monitor +rocketmq.config.dataPath=./store +#set it false if you don't want use dashboard.default true +rocketmq.config.enableDashBoardCollect=true +#set the message track trace topic if you don't want use the default one +rocketmq.config.msgTrackTopicName= +rocketmq.config.ticketKey=ticket + +#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required +rocketmq.config.loginRequired=true + +#set the accessKey and secretKey if you used acl +#rocketmq.config.accessKey= +#rocketmq.config.secretKey= +rocketmq.config.useTLS=false diff --git a/docs/dev-ops/environment/rocketmq/data/console/config/config b/docs/dev-ops/environment/rocketmq/data/console/config/config new file mode 120000 index 0000000..a367cfd --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/data/console/config/config @@ -0,0 +1 @@ +/home/app/data/console/config \ No newline at end of file diff --git a/docs/dev-ops/environment/rocketmq/data/console/config/logback.xml b/docs/dev-ops/environment/rocketmq/data/console/config/logback.xml new file mode 100644 index 0000000..d3b20d1 --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/data/console/config/logback.xml @@ -0,0 +1,49 @@ + + + + + + + [%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n + + + + + ${user.home}${file.separator}data${file.separator}logs${file.separator}consolelogs${file.separator}rocketmq-console.log + true + + ${user.home}${file.separator}data${file.separator}logs${file.separator}consolelogs${file.separator}rocketmq-console-%d{yyyy-MM-dd}.%i.log + + 104857600 + + 10 + + + [%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n + UTF-8 + + + + + + + + + \ No newline at end of file diff --git a/docs/dev-ops/environment/rocketmq/data/console/store/users.properties b/docs/dev-ops/environment/rocketmq/data/console/store/users.properties new file mode 100644 index 0000000..c18849b --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/data/console/store/users.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file supports hot change, any change will be auto-reloaded without Dashboard restarting. +# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin) + +# Define Admin +admin=admin,1 + +# Define Users +normal=normal \ No newline at end of file diff --git a/docs/dev-ops/environment/rocketmq/data/logs/logs.txt b/docs/dev-ops/environment/rocketmq/data/logs/logs.txt new file mode 100644 index 0000000..e69de29 diff --git a/docs/dev-ops/environment/rocketmq/data/rocketmq/conf/broker.conf b/docs/dev-ops/environment/rocketmq/data/rocketmq/conf/broker.conf new file mode 100644 index 0000000..9e6bbea --- /dev/null +++ b/docs/dev-ops/environment/rocketmq/data/rocketmq/conf/broker.conf @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# 集群名称 +brokerClusterName = DefaultCluster +# BROKER 名称 +brokerName = broker-a +# 0 表示 Master, > 0 表示 Slave +brokerId = 0 +# 删除文件时间点,默认凌晨 4 点 +deleteWhen = 04 +# 文件保留时间,默认 48 小时 +fileReservedTime = 48 +# BROKER 角色 ASYNC_MASTER为异步主节点,SYNC_MASTER为同步主节点,SLAVE为从节点 +brokerRole = ASYNC_MASTER +# 刷新数据到磁盘的方式,ASYNC_FLUSH 刷新 +flushDiskType = ASYNC_FLUSH +# 存储路径 +storePathRootDir = /home/app/data/rocketmq/store +# IP地址 +brokerIP1 = 127.0.0.1 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7301d74..706cf0e 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,16 @@ xxl-job-core 2.4.0 + + org.apache.rocketmq + rocketmq-client-java + 5.0.4 + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.0 + diff --git a/xfg-frame-archetype-std-app/pom.xml b/xfg-frame-archetype-std-app/pom.xml index 2932a86..650f6d4 100644 --- a/xfg-frame-archetype-std-app/pom.xml +++ b/xfg-frame-archetype-std-app/pom.xml @@ -106,6 +106,14 @@ com.xuxueli xxl-job-core + + org.apache.rocketmq + rocketmq-client-java + + + org.apache.rocketmq + rocketmq-spring-boot-starter + diff --git a/xfg-frame-archetype-std-app/src/main/resources/application-dev.yml b/xfg-frame-archetype-std-app/src/main/resources/application-dev.yml index 95db537..f258824 100644 --- a/xfg-frame-archetype-std-app/src/main/resources/application-dev.yml +++ b/xfg-frame-archetype-std-app/src/main/resources/application-dev.yml @@ -52,6 +52,29 @@ dubbo: scan: base-packages: cn.bugstack.api +# RocketMQ 配置,在docs/dev-ops下,有创建 xfg-group 脚本,也有场景mq消息的脚本 +rocketmq: + name-server: 127.0.0.1:9876 + consumer: + group: xfg-group + # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值 + pull-batch-size: 10 + producer: + # 发送同一类消息的设置为同一个group,保证唯一 + group: xfg-group + # 发送消息超时时间,默认3000 + sendMessageTimeout: 10000 + # 发送消息失败重试次数,默认2 + retryTimesWhenSendFailed: 2 + # 异步消息重试此处,默认2 + retryTimesWhenSendAsyncFailed: 2 + # 消息最大长度,默认1024 * 1024 * 4(默认4M) + maxMessageSize: 4096 + # 压缩消息阈值,默认4k(1024 * 4) + compressMessageBodyThreshold: 4096 + # 是否在内部发送失败时重试另一个broker,默认false + retryNextServer: false + # xxl-job https://www.xuxueli.com/xxl-job/#%E6%AD%A5%E9%AA%A4%E4%B8%80%EF%BC%9A%E8%B0%83%E5%BA%A6%E4%B8%AD%E5%BF%83%E9%85%8D%E7%BD%AE%EF%BC%9A xxl: job: diff --git a/xfg-frame-archetype-std-app/src/test/java/cn/bugstack/test/infrastructure/redis/RedisTest.java b/xfg-frame-archetype-std-app/src/test/java/cn/bugstack/test/infrastructure/redis/RedisTest.java index 33b995d..41c2e7f 100644 --- a/xfg-frame-archetype-std-app/src/test/java/cn/bugstack/test/infrastructure/redis/RedisTest.java +++ b/xfg-frame-archetype-std-app/src/test/java/cn/bugstack/test/infrastructure/redis/RedisTest.java @@ -1,4 +1,4 @@ -package cn.bugstack.test.infrastructure.redis; +package cn.bugstack.test.infrastructure.persistent; import cn.bugstack.infrastructure.redis.IRedisService; import lombok.extern.slf4j.Slf4j; diff --git a/xfg-frame-archetype-std-trigger/pom.xml b/xfg-frame-archetype-std-trigger/pom.xml index 58ae619..d4d8180 100644 --- a/xfg-frame-archetype-std-trigger/pom.xml +++ b/xfg-frame-archetype-std-trigger/pom.xml @@ -38,7 +38,14 @@ com.xuxueli xxl-job-core - + + org.apache.rocketmq + rocketmq-client-java + + + org.apache.rocketmq + rocketmq-spring-boot-starter + diff --git a/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/job/XXLJob.java b/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/job/XXLJob.java index 1559fef..0998d7c 100644 --- a/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/job/XXLJob.java +++ b/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/job/XXLJob.java @@ -1,7 +1,10 @@ package cn.bugstack.trigger.job; import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -11,10 +14,15 @@ import org.springframework.stereotype.Component; @Component public class XXLJob { + @Setter(onMethod_ = @Autowired) + private RocketMQTemplate rocketmqTemplate; + @XxlJob("demoJobHandler") public void doJob() { // 可以在任务中,调用一些业务方法逻辑的实现,如定时扫描超时未支付订单为关单处理,恢复库存 - log.info("执行任务 - XXL-Job - 01"); + log.info("执行任务 - XXL-Job - 01 发送一条MQ消息"); + // 发送MQ消息 + rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息"); } } diff --git a/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/listener/MQListener.java b/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/listener/MQListener.java new file mode 100644 index 0000000..5d3a299 --- /dev/null +++ b/xfg-frame-archetype-std-trigger/src/main/java/cn/bugstack/trigger/listener/MQListener.java @@ -0,0 +1,22 @@ +package cn.bugstack.trigger.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * RocketMQ 接收消息 + * @author Fuzhengwei bugstack.cn @小傅哥 + */ +@Component +@Slf4j +@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group") +public class MQListener implements RocketMQListener { + + @Override + public void onMessage(String s) { + log.info("接收到RocketMQ消息 {}", s); + } + +} -- GitLab