Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
quickmsg
smqtt
提交
97aefe43
S
smqtt
项目概览
quickmsg
/
smqtt
通知
3
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
smqtt
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
97aefe43
编写于
7月 11, 2021
作者:
1ssqq1lxr
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
doc 修改
上级
804ce7f0
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
185 addition
and
222 deletion
+185
-222
README.md
README.md
+177
-64
docs/http/2.fs.md
docs/http/2.fs.md
+0
-155
icon/admin.png
icon/admin.png
+0
-0
smqtt-common/src/main/java/io/github/quickmsg/common/message/HttpPublishMessage.java
...io/github/quickmsg/common/message/HttpPublishMessage.java
+2
-1
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/PublishActor.java
...ava/io/github/quickmsg/core/http/actors/PublishActor.java
+6
-2
未找到文件。
README.md
浏览文件 @
97aefe43
...
...
@@ -39,18 +39,30 @@ SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署
```
markdown
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.reactivePasswordAuth((U,P)->true)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).accessLog(true).build())
.build()
.startAwait();
Bootstrap.builder()
.rootLevel(Level.INFO)
.wiretap(false)
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> { })//netty options设置
.childOptions(channelOptionMap -> { }) //netty childOptions设置
.highWaterMark(1000000)
.reactivePasswordAuth((U, P) -> true)
.lowWaterMark(1000)
.ssl(false)
.sslContext(new SslContext("crt", "key"))
.isWebsocket(true)
.httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
.clusterConfig(
ClusterConfig.builder()
.clustered(false)
.port(7773)
.nodeName("node-2")
.clusterUrl("127.0.0.1:7771,127.0.0.1:7772")
.build()
)
.build()
.startAwait();
```
...
...
@@ -59,22 +71,30 @@ SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署
```
markdown
Bootstrap bootstrap =
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).accessLog(true).build())
.build()
.start().block();
assert bootstrap != null;
// 关闭服务
bootstrap.shutdown();
Bootstrap bootstrap = Bootstrap.builder()
.rootLevel(Level.INFO)
.wiretap(false)
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> { })//netty options设置
.childOptions(channelOptionMap -> { }) //netty childOptions设置
.highWaterMark(1000000)
.reactivePasswordAuth((U, P) -> true)
.lowWaterMark(1000)
.ssl(false)
.sslContext(new SslContext("crt", "key"))
.isWebsocket(true)
.httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
.clusterConfig(
ClusterConfig.builder()
.clustered(false)
.port(7773)
.nodeName("node-2")
.clusterUrl("127.0.0.1:7771,127.0.0.1:7772")
.build()
)
.build()
.start().block();
```
...
...
@@ -82,7 +102,7 @@ assert bootstrap != null;
## jar方式
1.
下载源码 mvn compile package -Dmaven.test.skip=true -P jar
1.
下载源码 mvn compile package -Dmaven.test.skip=true -P jar
,web
```
markdown
在smqtt-bootstrap/target目录下生成jar
...
...
@@ -92,40 +112,95 @@ assert bootstrap != null;
```
markdown
# 开启tcp端口
smqtt.tcp.port=1883
# 高水位
smqtt.tcp.lowWaterMark=4000000
# 低水位
smqtt.tcp.highWaterMark=80000000
# 开启ssl加密
smqtt.tcp.ssl=false
# 证书crt smqtt.tcp.ssl.crt =
# 证书key smqtt.tcp.ssl.key =
# 开启日志
smqtt.tcp.wiretap=false
# boss线程
smqtt.tcp.bossThreadSize=4
# work线程
smqtt.tcp.workThreadSize=8
# websocket端口
smqtt.websocket.port=8999
# websocket开启
smqtt.websocket.enable=true
# smqtt用户
smqtt.tcp.username=smqtt
# smqtt密码
smqtt.tcp.password=smqtt
# 开启http
smqtt.http.enable=true
# 开启http端口
smqtt.http.port=1999
# 开启http日志
smqtt.http.accesslog=true
# 开启ssl
smqtt.http.ssl.enable=false
# smqtt.http.ssl.crt =
# smqtt.http.ssl.key
# 日志级别 ALL|TRACE|DEBUG|INFO|WARN|ERROR|OFF
smqtt.log.level=INFO
# 开启tcp端口
smqtt.tcp.port=1883
# 高水位
smqtt.tcp.lowWaterMark=4000000
# 低水位
smqtt.tcp.highWaterMark=80000000
# 开启ssl加密
smqtt.tcp.ssl=false
# 证书crt smqtt.tcp.ssl.crt =
# 证书key smqtt.tcp.ssl.key =
# 开启日志
smqtt.tcp.wiretap=false
# boss线程
smqtt.tcp.bossThreadSize=4
# work线程
smqtt.tcp.workThreadSize=8
# websocket端口
smqtt.websocket.port=8999
# websocket开启
smqtt.websocket.enable=true
# smqtt用户
smqtt.tcp.username=smqtt
# smqtt密码
smqtt.tcp.password=smqtt
# 开启http
smqtt.http.enable=true
# 开启http日志
smqtt.http.accesslog=true
# 开启ssl
smqtt.http.ssl.enable=false
# smqtt.http.ssl.crt =
# smqtt.http.ssl.key =
# 开启管理后台(必须开启http)
smqtt.http.admin.enable=true
# 管理后台登录用户
smqtt.http.admin.username=smqtt
# 管理后台登录密码
smqtt.http.admin.password=smqtt
# 开启集群
smqtt.cluster.enable=false
# 集群节点地址
smqtt.cluster.url=127.0.0.1:7771,127.0.0.1:7772
# 节点端口
smqtt.cluster.port=7771
# 节点名称
smqtt.cluster.node=node-1
# 数据库配置(选配)
db.driverClassName=com.mysql.jdbc.Driver
db.url=jdbc:mysql://127.0.0.1:3306/smqtt?characterEncoding=utf-8&useSSL=false&useInformationSchema=true&serverTimezone=UTC
db.username=root
db.password=123
# 连接池初始化连接数
db.initialSize=10
# 连接池中最多支持多少个活动会话
db.maxActive=300
# 向连接池中请求连接时,超过maxWait的值后,认为本次请求失败
db.maxWait=60000
# 回收空闲连接时,将保证至少有minIdle个连接
db.minIdle=2
# redis配置(选配)
# 单机模式:single 哨兵模式:sentinel 集群模式:cluster
redis.mode=single
# 数据库
redis.database=0
# 密码
redis.password=
# 超时时间
redis.timeout=3000
# 最小空闲数
redis.pool.min.idle=8
# 连接超时时间(毫秒)
redis.pool.conn.timeout=3000
# 连接池大小
redis.pool.size=10
# 单机配置
redis.single.address=127.0.0.1:6379
# 集群配置
redis.cluster.scan.interval=1000
redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005
redis.cluster.read.mode=SLAVE
redis.cluster.retry.attempts=3
redis.cluster.slave.connection.pool.size=64
redis.cluster.master.connection.pool.size=64
redis.cluster.retry.interval=1500
# 哨兵配置
redis.sentinel.master=mymaster
redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26379,127.0.0.1:26379
```
3. 启动服务
...
...
@@ -153,7 +228,7 @@ docker pull 1ssqq1lxr/smqtt:latest
docker run -it -p 1883:1883 1ssqq1lxr/smqtt
```
启动镜像使用自定义配置(
准备配置文件conf.properties)
启动镜像使用自定义配置(
同上
准备配置文件conf.properties)
```
...
...
@@ -173,6 +248,44 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1
curl -H "Content-Type: application/json" -X POST -d '{"topic": "test/teus", "qos":2, "retain":true, "message":"我来测试保留消息3" }' "http://localhost:1999/smqtt/publish"
```
## 管理后台(60000端口)
### 如何开启
- main启动
设置httpOptions && enableAdmin = true
```
Bootstrap.httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
```
- jar / docker 启动
设置config.properties
```
# 开启http
smqtt.http.enable=true
# 开启http日志
smqtt.http.accesslog=true
# 开启ssl
smqtt.http.ssl.enable=false
# smqtt.http.ssl.crt =
# smqtt.http.ssl.key =
# 开启管理后台(必须开启http)
smqtt.http.admin.enable=true
# 管理后台登录用户
smqtt.http.admin.username=smqtt
# 管理后台登录密码
smqtt.http.admin.password=smqtt
```
### 页面预览
![image](icon/admin.png)
## 压测文档
[点这里](https://blog.csdn.net/JingleYe/article/details/118190935)
...
...
docs/http/2.fs.md
浏览文件 @
97aefe43
...
...
@@ -32,158 +32,3 @@ sort: 2
curl -H "Content-Type: application/json" -X POST -d '{"topic": "test/teus", "qos":2, "retain":true, "message":"我来测试保留消息3" }' "http://localhost:1999/smqtt/publish"
```
-
##获取当前连接
> 系统内置了io.github.quickmsg.core.http.ConnectionActor,用于获取当前连接。使用方式如下:
-
请求url /smqtt/connection
-
请求方式 POST
-
请求Body
无
-
返回Body
| 参数 | 说明 | 必传 |
| ---- | ---- |---- |
| connection | 连接 |是 |
| status | 状态 |是 |
| activeTime | 激活时间 |是 |
| authTime | 认证时间 |是 |
| sessionPersistent | session开启 |是 |
| will | 遗嘱消息 |是 |
| keepalive | 心跳时间 |是 |
| topics | 订阅topic |是 |
```
markdown
[
{
"activeTime": 1624113398783,
"authTime": 1624113398805,
"clientIdentifier": "mqttx_20fe13251",
"connection": {
"disposed": false,
"inboundCancelled": false,
"inboundDisposed": false,
"persistent": true,
"subscriptionDisposed": false
},
"keepalive": 60,
"sessionPersistent": true,
"status": "ONLINE",
"topics": [],
"will": {
"mqttQoS": "AT_MOST_ONCE",
"retain": false,
"willMessage": "YXNkbmJsa2hhc2Jk",
"willTopic": "test/close"
}
}
]
```
-
发送请求
```
curl -H "Content-Type: application/json" -X POST "http://localhost:1999/smqtt/connection"
```
-
##获取当前集群信息
> 系统内置了io.github.quickmsg.core.http.actors.ClusterActor,用于获取当前连接。使用方式如下:
-
请求url /smqtt/cluster
-
请求方式 POST
-
请求Body
无
-
返回Body
| 参数 | 说明 | 必传 |
| ---- | ---- |---- |
| alias | node名称 |是 |
| host | 主机ip |是 |
| port | 端口 |是 |
| namespace | 命名空间 |是 |
```
markdown
[
{
"alias": "node-4",
"host": "169.254.122.50",
"namespace": "default",
"port": 7771
},
{
"alias": "node-3",
"host": "169.254.122.50",
"namespace": "default",
"port": 7772
},
{
"alias": "node-2",
"host": "169.254.122.50",
"namespace": "default",
"port": 7773
}
]
```
-
发送请求
```
curl -H "Content-Type: application/json" -X POST "http://localhost:1999/smqtt/cluster"
```
-
##获取当前订阅信息
> 系统内置了io.github.quickmsg.core.http.actors.SubscribeActor,用于获取当前连接。使用方式如下:
-
请求url /smqtt/subscribe
-
请求方式 POST
-
请求Body
无
-
返回Body
| 参数 | 说明 | 必传 |
| ---- | ---- |---- |
| activeTime | 激活時間 |是 |
| authTime | 认证时间 |是 |
| clientIdentifier | 客户端id |是 |
| keepalive | 心跳时间 |是 |
| sessionPersistent | 是否持久化 |是 |
| status | 在线状态 |是 |
| topics | 订阅topic |是 |
```
markdown
{
"test/+": [
{
"activeTime": 1624110181885,
"authTime": 1624110181923,
"clientIdentifier": "client-id-3",
"connection": {
"disposed": false,
"inboundCancelled": false,
"inboundDisposed": false,
"persistent": true,
"subscriptionDisposed": false
},
"keepalive": 20,
"sessionPersistent": true,
"status": "ONLINE",
"topics": [
"test/+"
]
}
]
}
```
-
发送请求
```
curl -H "Content-Type: application/json" -X POST "http://localhost:1999/smqtt/cluster"
```
\ No newline at end of file
icon/admin.png
0 → 100644
浏览文件 @
97aefe43
76.4 KB
smqtt-common/src/main/java/io/github/quickmsg/common/message/HttpPublishMessage.java
浏览文件 @
97aefe43
...
...
@@ -5,6 +5,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage;
import
io.netty.handler.codec.mqtt.MqttQoS
;
import
lombok.Data
;
import
java.nio.charset.StandardCharsets
;
import
java.util.Map
;
/**
...
...
@@ -31,7 +32,7 @@ public class HttpPublishMessage {
retain
,
1
,
topic
,
PooledByteBufAllocator
.
DEFAULT
.
buffer
().
writeBytes
(
message
.
getBytes
()));
PooledByteBufAllocator
.
DEFAULT
.
buffer
().
writeBytes
(
message
.
getBytes
(
StandardCharsets
.
UTF_8
)));
}
...
...
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/PublishActor.java
浏览文件 @
97aefe43
...
...
@@ -6,13 +6,15 @@ import io.github.quickmsg.common.config.Configuration;
import
io.github.quickmsg.common.enums.HttpType
;
import
io.github.quickmsg.common.message.HttpPublishMessage
;
import
io.github.quickmsg.core.http.AbstractHttpActor
;
import
io.github.quickmsg.core.http.HttpConfiguration
;
import
lombok.extern.slf4j.Slf4j
;
import
org.reactivestreams.Publisher
;
import
reactor.core.publisher.Mono
;
import
reactor.netty.http.server.HttpServerRequest
;
import
reactor.netty.http.server.HttpServerResponse
;
import
java.nio.charset.Charset
;
import
java.nio.charset.StandardCharsets
;
/**
* @author luxurong
*/
...
...
@@ -24,9 +26,11 @@ public class PublishActor extends AbstractHttpActor {
@Override
public
Publisher
<
Void
>
doRequest
(
HttpServerRequest
request
,
HttpServerResponse
response
,
Configuration
httpConfiguration
)
{
Charset
charset
=
Charset
.
defaultCharset
();
log
.
info
(
"Charset is {}"
,
charset
);
return
request
.
receive
()
.
asString
()
.
asString
(
StandardCharsets
.
UTF_8
)
.
map
(
this
.
toJson
(
HttpPublishMessage
.
class
))
.
doOnNext
(
message
->
{
this
.
sendMqttMessage
(
message
.
getPublishMessage
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录