提交 5db822e7 编写于 作者: 小傅哥's avatar 小傅哥

小傅哥,feat:rocketmq 配置使用

上级 1394a9fd
...@@ -86,6 +86,7 @@ services: ...@@ -86,6 +86,7 @@ services:
container_name: mysql-job-dbdata container_name: mysql-job-dbdata
volumes: volumes:
- /var/lib/mysql - /var/lib/mysql
# Redis # Redis
redis: redis:
image: redis:7.2.0 image: redis:7.2.0
...@@ -98,6 +99,25 @@ services: ...@@ -98,6 +99,25 @@ services:
networks: networks:
- my-network - my-network
# https://hub.docker.com/r/xuchengen/rocketmq
# 注意修改项;
# 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
# 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
rocketmq:
image: livinphp/rocketmq:5.1.0
container_name: rocketmq
ports:
- 8080:8080
- 9876:9876
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./rocketmq/data:/home/app/data
environment:
TZ: "Asia/Shanghai"
NAMESRV_ADDR: "rocketmq:9876"
networks: networks:
my-network: my-network:
driver: bridge driver: bridge
\ No newline at end of file
# 命令创建 消费者组 Group xfg-group 你可以更换你需要的
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group
# 命令创建 主题消息 Topic xfg-mq 你可以更换你需要的
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
server.address=0.0.0.0
server.port=8080
### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
#spring.application.index=true
spring.application.name=rocketmq-dashboard
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.level.root=INFO
logging.config=./config/logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#timeout for mqadminExt, default 5000ms
rocketmq.config.timeoutMillis=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=./store
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=true
#set the accessKey and secretKey if you used acl
#rocketmq.config.accessKey=
#rocketmq.config.secretKey=
rocketmq.config.useTLS=false
/home/app/data/console/config
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder charset="UTF-8">
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
</encoder>
</appender>
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}${file.separator}data${file.separator}logs${file.separator}consolelogs${file.separator}rocketmq-console.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${user.home}${file.separator}data${file.separator}logs${file.separator}consolelogs${file.separator}rocketmq-console-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>104857600</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<MaxHistory>10</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file supports hot change, any change will be auto-reloaded without Dashboard restarting.
# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin)
# Define Admin
admin=admin,1
# Define Users
normal=normal
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名称
brokerClusterName = DefaultCluster
# BROKER 名称
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 删除文件时间点,默认凌晨 4 点
deleteWhen = 04
# 文件保留时间,默认 48 小时
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER为异步主节点,SYNC_MASTER为同步主节点,SLAVE为从节点
brokerRole = ASYNC_MASTER
# 刷新数据到磁盘的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
\ No newline at end of file
...@@ -136,6 +136,16 @@ ...@@ -136,6 +136,16 @@
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
<version>2.4.0</version> <version>2.4.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 工程模块 --> <!-- 工程模块 -->
<dependency> <dependency>
......
...@@ -106,6 +106,14 @@ ...@@ -106,6 +106,14 @@
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<!-- 工程模块;启动依赖 trigger->domain, infrastructure--> <!-- 工程模块;启动依赖 trigger->domain, infrastructure-->
<dependency> <dependency>
......
...@@ -52,6 +52,29 @@ dubbo: ...@@ -52,6 +52,29 @@ dubbo:
scan: scan:
base-packages: cn.bugstack.api base-packages: cn.bugstack.api
# RocketMQ 配置,在docs/dev-ops下,有创建 xfg-group 脚本,也有场景mq消息的脚本
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: xfg-group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: xfg-group
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
# xxl-job https://www.xuxueli.com/xxl-job/#%E6%AD%A5%E9%AA%A4%E4%B8%80%EF%BC%9A%E8%B0%83%E5%BA%A6%E4%B8%AD%E5%BF%83%E9%85%8D%E7%BD%AE%EF%BC%9A # xxl-job https://www.xuxueli.com/xxl-job/#%E6%AD%A5%E9%AA%A4%E4%B8%80%EF%BC%9A%E8%B0%83%E5%BA%A6%E4%B8%AD%E5%BF%83%E9%85%8D%E7%BD%AE%EF%BC%9A
xxl: xxl:
job: job:
......
package cn.bugstack.test.infrastructure.redis; package cn.bugstack.test.infrastructure.persistent;
import cn.bugstack.infrastructure.redis.IRedisService; import cn.bugstack.infrastructure.redis.IRedisService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
......
...@@ -38,7 +38,14 @@ ...@@ -38,7 +38,14 @@
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<!-- 系统模块 --> <!-- 系统模块 -->
<dependency> <dependency>
......
package cn.bugstack.trigger.job; package cn.bugstack.trigger.job;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
...@@ -11,10 +14,15 @@ import org.springframework.stereotype.Component; ...@@ -11,10 +14,15 @@ import org.springframework.stereotype.Component;
@Component @Component
public class XXLJob { public class XXLJob {
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
@XxlJob("demoJobHandler") @XxlJob("demoJobHandler")
public void doJob() { public void doJob() {
// 可以在任务中,调用一些业务方法逻辑的实现,如定时扫描超时未支付订单为关单处理,恢复库存 // 可以在任务中,调用一些业务方法逻辑的实现,如定时扫描超时未支付订单为关单处理,恢复库存
log.info("执行任务 - XXL-Job - 01"); log.info("执行任务 - XXL-Job - 01 发送一条MQ消息");
// 发送MQ消息
rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息");
} }
} }
package cn.bugstack.trigger.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ 接收消息
* @author Fuzhengwei bugstack.cn @小傅哥
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class MQListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到RocketMQ消息 {}", s);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册