Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
wushizhenking
advanced-java
提交
f546851c
A
advanced-java
项目概览
wushizhenking
/
advanced-java
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
advanced-java
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
f546851c
编写于
11月 12, 2019
作者:
Y
Yang Libin
提交者:
GitHub
11月 12, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #120 from huifer/master
micro service
上级
83b7789a
9f41b9c4
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
644 addition
and
3 deletion
+644
-3
README.md
README.md
+4
-3
docs/high-concurrency/huifer-how-to-limit-current.md
docs/high-concurrency/huifer-how-to-limit-current.md
+362
-0
docs/micro-services/huifer-what's-microservice-how-to-communicate.md
...services/huifer-what's-microservice-how-to-communicate.md
+278
-0
未找到文件。
README.md
浏览文件 @
f546851c
...
...
@@ -101,7 +101,7 @@
-
如何设计一个高可用系统?
### 限流
-
如何限流?在工作中是怎么做的?说一下具体的实现?
-
[
如何限流?在工作中是怎么做的?说一下具体的实现?
](
/docs/high-concurrency/huifer-how-to-limit-current.md
)
### 熔断
-
如何进行熔断?
...
...
@@ -118,11 +118,12 @@
-
[
微服务的事件驱动数据管理
](
/docs/micro-services/event-driven-data-management-for-microservices.md
)
### Spring Cloud 微服务架构
-
什么是微服务?微服务之间是如何独立通讯的?
-
[
什么是微服务?微服务之间是如何独立通讯的?
](
/docs/micro-services/huifer-what's-microservice-how-to-communicate.md
)
-
Spring Cloud 和 Dubbo 有哪些区别?
-
Spring Boot 和 Spring Cloud,谈谈你对它们的理解?
-
什么是服务熔断?什么是服务降级?
-
微服务的优缺点分别是什么?说一下你在项目开发中碰到的坑?
-
你所知道的微服务技术栈都有哪些?
-
[
你所知道的微服务技术栈都有哪些?
](
/docs/micro-services/huifer-micro-services-technology-stack%20.md
)
-
[
微服务治理策略
](
/docs/micro-services/huifer-micro-service-governance.md
)
-
Eureka 和 Zookeeper 都可以提供服务注册与发现的功能,它们有什么区别?
-
......
docs/high-concurrency/huifer-how-to-limit-current.md
0 → 100644
浏览文件 @
f546851c
# 如何限流?在工作中是怎么做的?说一下具体的实现?
-
Author:
[
HuiFer
](
https://github.com/huifer
)
-
Description: 该文简单介绍限流相关技术以及实现
## 什么是限流
> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
## 限流方法
### 计数器
#### 实现方式
-
控制单位时间内的请求数量
```
java
import
java.util.concurrent.atomic.AtomicInteger
;
public
class
Counter
{
/**
* 最大访问数量
*/
private
final
int
limit
=
10
;
/**
* 访问时间差
*/
private
final
long
timeout
=
1000
;
/**
* 请求时间
*/
private
long
time
;
/**
* 当前计数器
*/
private
AtomicInteger
reqCount
=
new
AtomicInteger
(
0
);
public
boolean
limit
()
{
long
now
=
System
.
currentTimeMillis
();
if
(
now
<
time
+
timeout
)
{
// 单位时间内
reqCount
.
addAndGet
(
1
);
return
reqCount
.
get
()
<=
limit
;
}
else
{
// 超出单位时间
time
=
now
;
reqCount
=
new
AtomicInteger
(
0
);
return
true
;
}
}
}
```
-
劣势
-
假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求
`n-1`
(n为限流请求数量),在下一分钟的 00:01 发送n个请求,这样在2秒钟内请求到达了
`2n - 1`
个.
-
设每分钟请求数量为60个,每秒可以处理1个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时2秒钟有120个请求(每秒60个请求),远远大于了每秒钟处理数量的阈值
### 滑动窗口
#### 实现方式
-
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位
-
把一分钟分成若干等分(6份,每份10秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加1.当等分数量越大限流统计就越详细
```
java
package
com.example.demo1.service
;
import
java.util.Iterator
;
import
java.util.Random
;
import
java.util.concurrent.ConcurrentLinkedQueue
;
import
java.util.stream.IntStream
;
public
class
TimeWindow
{
private
ConcurrentLinkedQueue
<
Long
>
queue
=
new
ConcurrentLinkedQueue
<
Long
>();
/**
* 间隔秒数
*/
private
int
seconds
;
/**
* 最大限流
*/
private
int
max
;
public
TimeWindow
(
int
max
,
int
seconds
)
{
this
.
seconds
=
seconds
;
this
.
max
=
max
;
/**
* 永续线程执行清理queue 任务
*/
new
Thread
(()
->
{
while
(
true
)
{
try
{
// 等待 间隔秒数-1 执行清理操作
Thread
.
sleep
((
seconds
-
1
)
*
1000L
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
clean
();
}
}).
start
();
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
final
TimeWindow
timeWindow
=
new
TimeWindow
(
10
,
1
);
// 测试3个线程
IntStream
.
range
(
0
,
3
).
forEach
((
i
)
->
{
new
Thread
(()
->
{
while
(
true
)
{
try
{
Thread
.
sleep
(
new
Random
().
nextInt
(
20
)
*
100
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
timeWindow
.
take
();
}
}).
start
();
});
}
/**
* 获取令牌,并且添加时间
*/
public
void
take
()
{
long
start
=
System
.
currentTimeMillis
();
try
{
int
size
=
sizeOfValid
();
if
(
size
>
max
)
{
System
.
err
.
println
(
"超限"
);
}
synchronized
(
queue
)
{
if
(
sizeOfValid
()
>
max
)
{
System
.
err
.
println
(
"超限"
);
System
.
err
.
println
(
"queue中有 "
+
queue
.
size
()
+
" 最大数量 "
+
max
);
}
this
.
queue
.
offer
(
System
.
currentTimeMillis
());
}
System
.
out
.
println
(
"queue中有 "
+
queue
.
size
()
+
" 最大数量 "
+
max
);
}
}
public
int
sizeOfValid
()
{
Iterator
<
Long
>
it
=
queue
.
iterator
();
Long
ms
=
System
.
currentTimeMillis
()
-
seconds
*
1000
;
int
count
=
0
;
while
(
it
.
hasNext
())
{
long
t
=
it
.
next
();
if
(
t
>
ms
)
{
// 在当前的统计时间范围内
count
++;
}
}
return
count
;
}
/**
* 清理过期的时间
*/
public
void
clean
()
{
Long
c
=
System
.
currentTimeMillis
()
-
seconds
*
1000
;
Long
tl
=
null
;
while
((
tl
=
queue
.
peek
())
!=
null
&&
tl
<
c
)
{
System
.
out
.
println
(
"清理数据"
);
queue
.
poll
();
}
}
}
```
### Leaky Bucket 漏桶
#### 实现方式
-
规定固定容量的桶,有水进入,有水流出. 对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度.
```
java
public
class
LeakBucket
{
/**
* 时间
*/
private
long
time
;
/**
* 总量
*/
private
Double
total
;
/**
* 水流出去的速度
*/
private
Double
rate
;
/**
* 当前总量
*/
private
Double
nowSize
;
public
boolean
limit
()
{
long
now
=
System
.
currentTimeMillis
();
nowSize
=
Math
.
max
(
0
,
(
nowSize
-
(
now
-
time
)
*
rate
));
time
=
now
;
if
((
nowSize
+
1
)
<
total
)
{
nowSize
++;
return
true
;
}
else
{
return
false
;
}
}
}
```
### Token Bucket 令牌桶
#### 实现方式
-
规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求
```
java
public
class
TokenBucket
{
/**
* 时间
*/
private
long
time
;
/**
* 总量
*/
private
Double
total
;
/**
* token 放入速度
*/
private
Double
rate
;
/**
* 当前总量
*/
private
Double
nowSize
;
public
boolean
limit
()
{
long
now
=
System
.
currentTimeMillis
();
nowSize
=
Math
.
min
(
total
,
nowSize
+
(
now
-
time
)
*
rate
);
time
=
now
;
if
(
nowSize
<
1
)
{
// 桶里没有token
return
false
;
}
else
{
// 存在token
nowSize
-=
1
;
return
true
;
}
}
}
```
## 工作中的使用
### spring cloud gateway
-
spring cloud gateway 默认使用redis进行限流,笔者一般只是修改修改参数属于拿来即用.并没有去从头实现上述那些算法.
```
xml
<dependency>
<groupId>
org.springframework.cloud
</groupId>
<artifactId>
spring-cloud-starter-gateway
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-data-redis-reactive
</artifactId>
</dependency>
```
```
yaml
spring
:
cloud
:
gateway
:
routes
:
-
id
:
requestratelimiter_route
uri
:
lb://pigx-upms
order
:
10000
predicates
:
-
Path=/admin/**
filters
:
-
name
:
RequestRateLimiter
args
:
redis-rate-limiter.replenishRate
:
1
# 令牌桶的容积
redis-rate-limiter.burstCapacity
:
3
# 流速 每秒
key-resolver
:
"
#{@remoteAddrKeyResolver}"
#SPEL表达式去的对应的bean
-
StripPrefix=1
```
```
java
@Bean
KeyResolver
remoteAddrKeyResolver
()
{
return
exchange
->
Mono
.
just
(
exchange
.
getRequest
().
getRemoteAddress
().
getHostName
());
}
```
### sentinel
-
通过配置来控制每个url的流量
```
xml
<dependency>
<groupId>
com.alibaba.cloud
</groupId>
<artifactId>
spring-cloud-starter-alibaba-sentinel
</artifactId>
</dependency>
```
```
yaml
spring
:
cloud
:
nacos
:
discovery
:
server-addr
:
localhost:8848
sentinel
:
transport
:
dashboard
:
localhost:8080
port
:
8720
datasource
:
ds
:
nacos
:
server-addr
:
localhost:8848
dataId
:
spring-cloud-sentinel-nacos
groupId
:
DEFAULT_GROUP
rule-type
:
flow
namespace
:
xxxxxxxx
```
-
配置内容在nacos上进行编辑
```
json
[
{
"resource"
:
"/hello"
,
"limitApp"
:
"default"
,
"grade"
:
1
,
"count"
:
1
,
"strategy"
:
0
,
"controlBehavior"
:
0
,
"clusterMode"
:
false
}
]
```
-
resource:资源名,即限流规则的作用对象。
-
limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
-
grade:限流阈值类型,QPS 或线程数模式,0代表根据并发数量来限流,1代表根据QPS来进行流量控制。
-
count:限流阈值
-
strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口
-
controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式)
-
clusterMode:是否为集群模式
### 总结
> sentinel和spring cloud gateway两个框架都是很好的限流框架,但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用,所以我会选择**spring cloud gateway**,当接入完整的或者接入Nacos项目使用setinel会有更加好的体验.
\ No newline at end of file
docs/micro-services/huifer-what's-microservice-how-to-communicate.md
0 → 100644
浏览文件 @
f546851c
# 什么是微服务?微服务之间是如何独立通讯的?
-
Author:
[
HuiFer
](
https://github.com/huifer
)
-
Description: 介绍微服务的定义,服务之间的通讯,对RPC 和
## 什么是微服务
> 微服务架构是一个分布式系统, 按照业务进行划分成为不同的服务单元, 解决单体系统性能等不足.
> 微服务是一种架构风格, 一个大型软件应用由多个服务单元组成. 系统中的服务单元可以单独部署, 各个服务单元之间是松耦合的.
> 微服务概念起源: [Microservices](https://martinfowler.com/articles/microservices.html)
>
## 微服务之间是如何独立通讯的
### 同步
#### REST HTTP 协议
> REST 请求在微服务中是最为常用的一种通讯方式, 它依赖于 HTTP\HTTPS 协议.
-
RESTFUL特点
1.
每一个URI代表1种资源
2.
客户端使用 GET、POST、PUT、DELETE 4个表示操作方式的动词对服务端资源进行操作: GET 用来获取资源, POST 用来新建资源(也可以用于更新资源), PUT 用来更新资源, DELETE 用来删除资源
3.
通过操作资源的表现形式来操作资源
4.
资源的表现形式是 XML 或者 HTML
5.
客户端与服务端之间的交互在请求之间是无状态的,从客户端到服务端的每个请求都必须包含理解请求所必需的信息
##### 例子
-
有一个服务方提供了如下接口.
```
java
@RestController
@RequestMapping
(
"/communication"
)
public
class
RestControllerDemo
{
@GetMapping
(
"/hello"
)
public
String
s
()
{
return
"hello"
;
}
}
```
-
另外一个服务需要去调用该接口, 调用方只需要根据 API 文档发送请求即可获取返回结果.
```
java
@RestController
@RequestMapping
(
"/demo"
)
public
class
RestDemo
{
@Autowired
RestTemplate
restTemplate
;
@GetMapping
(
"/hello2"
)
public
String
s2
()
{
String
forObject
=
restTemplate
.
getForObject
(
"http://localhost:9013/communication/hello"
,
String
.
class
);
return
forObject
;
}
}
```
-
通过这样的方式可以实现服务之间的通讯
#### RPC TCP协议
> RPC(Remote Procedure Call)远程过程调用, 简单的理解是一个节点请求另一个节点提供的服务
>
>工作流程
> 1. 执行客户端调用语句,传送参数
> 2. 调用本地系统发送网络消息
> 3. 消息传送到远程主机
> 4. 服务器得到消息并取得参数
> 5. 根据调用请求以及参数执行远程过程(服务)
> 6. 执行过程完毕,将结果返回服务器句柄
> 7. 服务器句柄返回结果,调用远程主机的系统网络服务发送结果
> 8. 消息传回本地主机
> 9. 客户端句柄由本地主机的网络服务接收消息
> 10. 客户端接收到调用语句返回的结果数据
-
这个不知道如何具体描述直接上代码.
-
首先需要一个服务端
```
java
import
java.io.IOException
;
import
java.io.ObjectInputStream
;
import
java.io.ObjectOutputStream
;
import
java.lang.reflect.Method
;
import
java.net.InetSocketAddress
;
import
java.net.ServerSocket
;
import
java.net.Socket
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
/**
* RPC 服务端用来注册远程方法的接口和实现类
* @Date: 2019-11-04
*/
public
class
RPCServer
{
private
static
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
Runtime
.
getRuntime
().
availableProcessors
());
private
static
final
ConcurrentHashMap
<
String
,
Class
>
serviceRegister
=
new
ConcurrentHashMap
<>();
/**
* 注册方法
* @param service
* @param impl
*/
public
void
register
(
Class
service
,
Class
impl
)
{
serviceRegister
.
put
(
service
.
getSimpleName
(),
impl
);
}
/**
* 启动方法
* @param port
*/
public
void
start
(
int
port
)
{
ServerSocket
socket
=
null
;
try
{
socket
=
new
ServerSocket
();
socket
.
bind
(
new
InetSocketAddress
(
port
));
System
.
out
.
println
(
"服务启动"
);
System
.
out
.
println
(
serviceRegister
);
while
(
true
)
{
executor
.
execute
(
new
Task
(
socket
.
accept
()));
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
socket
!=
null
)
{
try
{
socket
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
}
private
static
class
Task
implements
Runnable
{
Socket
client
=
null
;
public
Task
(
Socket
client
)
{
this
.
client
=
client
;
}
@Override
public
void
run
()
{
ObjectInputStream
input
=
null
;
ObjectOutputStream
output
=
null
;
try
{
input
=
new
ObjectInputStream
(
client
.
getInputStream
());
// 按照顺序读取对方写过来的内容
String
serviceName
=
input
.
readUTF
();
String
methodName
=
input
.
readUTF
();
Class
<?>[]
parameterTypes
=
(
Class
<?>[])
input
.
readObject
();
Object
[]
arguments
=
(
Object
[])
input
.
readObject
();
Class
serviceClass
=
serviceRegister
.
get
(
serviceName
);
if
(
serviceClass
==
null
)
{
throw
new
ClassNotFoundException
(
serviceName
+
" 没有找到!"
);
}
Method
method
=
serviceClass
.
getMethod
(
methodName
,
parameterTypes
);
Object
result
=
method
.
invoke
(
serviceClass
.
newInstance
(),
arguments
);
output
=
new
ObjectOutputStream
(
client
.
getOutputStream
());
output
.
writeObject
(
result
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
try
{
// 这里就不写 output!=null才关闭这个逻辑了
output
.
close
();
input
.
close
();
client
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
}
}
```
-
其次需要一个客户端
```
java
import
java.io.ObjectInputStream
;
import
java.io.ObjectOutputStream
;
import
java.lang.reflect.InvocationHandler
;
import
java.lang.reflect.Method
;
import
java.lang.reflect.Proxy
;
import
java.net.InetSocketAddress
;
import
java.net.Socket
;
/**
* RPC 客户端
* @Date: 2019-11-04
*/
public
class
RPCclient
<
T
>
{
/**
* 通过动态代理将参数发送过去到 RPCServer ,RPCserver 返回结果这个方法处理成为正确的实体
*/
public
static
<
T
>
T
getRemoteProxyObj
(
final
Class
<
T
>
service
,
final
InetSocketAddress
addr
)
{
return
(
T
)
Proxy
.
newProxyInstance
(
service
.
getClassLoader
(),
new
Class
<?>[]{
service
},
new
InvocationHandler
()
{
@Override
public
Object
invoke
(
Object
proxy
,
Method
method
,
Object
[]
args
)
throws
Throwable
{
Socket
socket
=
null
;
ObjectOutputStream
out
=
null
;
ObjectInputStream
input
=
null
;
try
{
socket
=
new
Socket
();
socket
.
connect
(
addr
);
// 将实体类,参数,发送给远程调用方
out
=
new
ObjectOutputStream
(
socket
.
getOutputStream
());
out
.
writeUTF
(
service
.
getSimpleName
());
out
.
writeUTF
(
method
.
getName
());
out
.
writeObject
(
method
.
getParameterTypes
());
out
.
writeObject
(
args
);
input
=
new
ObjectInputStream
(
socket
.
getInputStream
());
return
input
.
readObject
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
out
.
close
();
input
.
close
();
socket
.
close
();
}
return
null
;
}
});
}
}
```
-
再来一个测试的远程方法
```
java
public
interface
Tinterface
{
String
send
(
String
msg
);
}
public
class
TinterfaceImpl
implements
Tinterface
{
@Override
public
String
send
(
String
msg
)
{
return
"send message "
+
msg
;
}
}
```
-
测试代码
```
java
import
com.huifer.admin.rpc.Tinterface
;
import
com.huifer.admin.rpc.TinterfaceImpl
;
import
java.net.InetSocketAddress
;
/**
* @Date: 2019-11-04
*/
public
class
RunTest
{
public
static
void
main
(
String
[]
args
)
{
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
RPCServer
rpcServer
=
new
RPCServer
();
rpcServer
.
register
(
Tinterface
.
class
,
TinterfaceImpl
.
class
);
rpcServer
.
start
(
10000
);
}
}).
start
();
Tinterface
tinterface
=
RPCclient
.
getRemoteProxyObj
(
Tinterface
.
class
,
new
InetSocketAddress
(
"localhost"
,
10000
));
System
.
out
.
println
(
tinterface
.
send
(
"rpc 测试用例"
));
}
}
```
-
输出
`send message rpc 测试用例`
### 异步
#### 消息中间件
> 常见的消息中间件有 Kafka、ActiveMQ、RabbitMQ、RocketMQ , 常见的协议有AMQP、MQTTP、STOMP、XMPP. 这里不对消息队列进行拓展了, 具体如何使用还是请移步官网.
>
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录