Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
dk131072
spring-framework
提交
c187cb2f
S
spring-framework
项目概览
dk131072
/
spring-framework
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
spring-framework
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c187cb2f
编写于
11月 08, 2018
作者:
R
Rossen Stoyanchev
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Ensure client response is drained with onStatus hook
Issue: SPR-17473
上级
ed1d63dc
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
224 addition
and
25 deletion
+224
-25
spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java
.../core/io/buffer/AbstractDataBufferAllocatingTestCase.java
+41
-19
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
...ework/http/client/reactive/ReactorClientHttpResponse.java
+8
-0
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
...mework/web/reactive/function/client/DefaultWebClient.java
+17
-6
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
...ringframework/web/reactive/function/client/WebClient.java
+3
-0
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java
...e/function/client/WebClientDataBufferAllocatingTests.java
+151
-0
src/docs/asciidoc/web/webflux-webclient.adoc
src/docs/asciidoc/web/webflux-webclient.adoc
+4
-0
未找到文件。
spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java
浏览文件 @
c187cb2f
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
package
org.springframework.core.io.buffer
;
package
org.springframework.core.io.buffer
;
import
java.nio.charset.StandardCharsets
;
import
java.nio.charset.StandardCharsets
;
import
java.time.Duration
;
import
java.time.Instant
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.List
;
import
java.util.function.Consumer
;
import
java.util.function.Consumer
;
...
@@ -37,7 +39,11 @@ import org.springframework.core.io.buffer.support.DataBufferTestUtils;
...
@@ -37,7 +39,11 @@ import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import
static
org
.
junit
.
Assert
.*;
import
static
org
.
junit
.
Assert
.*;
/**
/**
* Base class for tests that read or write data buffers with a rule to check
* that allocated buffers have been released.
*
* @author Arjen Poutsma
* @author Arjen Poutsma
* @author Rossen Stoyanchev
*/
*/
@RunWith
(
Parameterized
.
class
)
@RunWith
(
Parameterized
.
class
)
public
abstract
class
AbstractDataBufferAllocatingTestCase
{
public
abstract
class
AbstractDataBufferAllocatingTestCase
{
...
@@ -62,6 +68,7 @@ public abstract class AbstractDataBufferAllocatingTestCase {
...
@@ -62,6 +68,7 @@ public abstract class AbstractDataBufferAllocatingTestCase {
@Rule
@Rule
public
final
Verifier
leakDetector
=
new
LeakDetector
();
public
final
Verifier
leakDetector
=
new
LeakDetector
();
protected
DataBuffer
createDataBuffer
(
int
capacity
)
{
protected
DataBuffer
createDataBuffer
(
int
capacity
)
{
return
this
.
bufferFactory
.
allocateBuffer
(
capacity
);
return
this
.
bufferFactory
.
allocateBuffer
(
capacity
);
}
}
...
@@ -93,30 +100,45 @@ public abstract class AbstractDataBufferAllocatingTestCase {
...
@@ -93,30 +100,45 @@ public abstract class AbstractDataBufferAllocatingTestCase {
};
};
}
}
/**
private
class
LeakDetector
extends
Verifier
{
* Wait until allocations are at 0, or the given duration elapses.
*/
@Override
protected
void
waitForDataBufferRelease
(
Duration
duration
)
throws
InterruptedException
{
protected
void
verify
()
throws
Throwable
{
Instant
start
=
Instant
.
now
();
if
(
bufferFactory
instanceof
NettyDataBufferFactory
)
{
while
(
Instant
.
now
().
isBefore
(
start
.
plus
(
duration
)))
{
ByteBufAllocator
byteBufAllocator
=
try
{
((
NettyDataBufferFactory
)
bufferFactory
).
getByteBufAllocator
();
verifyAllocations
();
if
(
byteBufAllocator
instanceof
PooledByteBufAllocator
)
{
break
;
PooledByteBufAllocator
pooledByteBufAllocator
=
}
(
PooledByteBufAllocator
)
byteBufAllocator
;
catch
(
AssertionError
ex
)
{
PooledByteBufAllocatorMetric
metric
=
pooledByteBufAllocator
.
metric
();
// ignore;
long
allocations
=
calculateAllocations
(
metric
.
directArenas
())
+
calculateAllocations
(
metric
.
heapArenas
());
assertTrue
(
"ByteBuf leak detected: "
+
allocations
+
" allocations were not released"
,
allocations
==
0
);
}
}
}
Thread
.
sleep
(
50
);
}
}
}
private
long
calculateAllocations
(
List
<
PoolArenaMetric
>
metrics
)
{
private
void
verifyAllocations
()
{
return
metrics
.
stream
().
mapToLong
(
PoolArenaMetric:
:
numActiveAllocations
).
sum
();
if
(
this
.
bufferFactory
instanceof
NettyDataBufferFactory
)
{
ByteBufAllocator
allocator
=
((
NettyDataBufferFactory
)
this
.
bufferFactory
).
getByteBufAllocator
();
if
(
allocator
instanceof
PooledByteBufAllocator
)
{
PooledByteBufAllocatorMetric
metric
=
((
PooledByteBufAllocator
)
allocator
).
metric
();
long
total
=
getAllocations
(
metric
.
directArenas
())
+
getAllocations
(
metric
.
heapArenas
());
assertEquals
(
"ByteBuf Leak: "
+
total
+
" unreleased allocations"
,
0
,
total
);
}
}
}
}
private
static
long
getAllocations
(
List
<
PoolArenaMetric
>
metrics
)
{
return
metrics
.
stream
().
mapToLong
(
PoolArenaMetric:
:
numActiveAllocations
).
sum
();
}
protected
class
LeakDetector
extends
Verifier
{
@Override
public
void
verify
()
{
AbstractDataBufferAllocatingTestCase
.
this
.
verifyAllocations
();
}
}
}
}
}
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
浏览文件 @
c187cb2f
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
package
org.springframework.http.client.reactive
;
package
org.springframework.http.client.reactive
;
import
java.util.Collection
;
import
java.util.Collection
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
io.netty.buffer.ByteBufAllocator
;
import
io.netty.buffer.ByteBufAllocator
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Flux
;
...
@@ -28,6 +29,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
...
@@ -28,6 +29,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
import
org.springframework.http.HttpHeaders
;
import
org.springframework.http.HttpHeaders
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.ResponseCookie
;
import
org.springframework.http.ResponseCookie
;
import
org.springframework.util.Assert
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.MultiValueMap
;
import
org.springframework.util.MultiValueMap
;
...
@@ -47,6 +49,8 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
...
@@ -47,6 +49,8 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
private
final
NettyInbound
inbound
;
private
final
NettyInbound
inbound
;
private
final
AtomicBoolean
bodyConsumed
=
new
AtomicBoolean
();
public
ReactorClientHttpResponse
(
HttpClientResponse
response
,
NettyInbound
inbound
,
ByteBufAllocator
alloc
)
{
public
ReactorClientHttpResponse
(
HttpClientResponse
response
,
NettyInbound
inbound
,
ByteBufAllocator
alloc
)
{
this
.
response
=
response
;
this
.
response
=
response
;
...
@@ -58,6 +62,10 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
...
@@ -58,6 +62,10 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
@Override
public
Flux
<
DataBuffer
>
getBody
()
{
public
Flux
<
DataBuffer
>
getBody
()
{
return
this
.
inbound
.
receive
()
return
this
.
inbound
.
receive
()
.
doOnSubscribe
(
s
->
// See https://github.com/reactor/reactor-netty/issues/503
Assert
.
state
(
this
.
bodyConsumed
.
compareAndSet
(
false
,
true
),
"The client response body can only be consumed once."
))
.
map
(
byteBuf
->
{
.
map
(
byteBuf
->
{
byteBuf
.
retain
();
byteBuf
.
retain
();
return
this
.
bufferFactory
.
wrap
(
byteBuf
);
return
this
.
bufferFactory
.
wrap
(
byteBuf
);
...
...
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
浏览文件 @
c187cb2f
...
@@ -436,19 +436,30 @@ class DefaultWebClient implements WebClient {
...
@@ -436,19 +436,30 @@ class DefaultWebClient implements WebClient {
private
<
T
extends
Publisher
<?>>
T
bodyToPublisher
(
ClientResponse
response
,
private
<
T
extends
Publisher
<?>>
T
bodyToPublisher
(
ClientResponse
response
,
T
bodyPublisher
,
Function
<
Mono
<?
extends
Throwable
>,
T
>
errorFunction
)
{
T
bodyPublisher
,
Function
<
Mono
<?
extends
Throwable
>,
T
>
errorFunction
)
{
if
(
HttpStatus
.
resolve
(
response
.
rawStatusCode
())
!=
null
)
{
if
(
HttpStatus
.
resolve
(
response
.
rawStatusCode
())
!=
null
)
{
return
this
.
statusHandlers
.
stream
()
for
(
StatusHandler
handler
:
this
.
statusHandlers
)
{
.
filter
(
statusHandler
->
statusHandler
.
test
(
response
.
statusCode
()))
if
(
handler
.
test
(
response
.
statusCode
()))
{
.
findFirst
()
Mono
<?
extends
Throwable
>
exMono
=
handler
.
apply
(
response
);
.
map
(
statusHandler
->
statusHandler
.
apply
(
response
))
exMono
=
exMono
.
flatMap
(
ex
->
drainBody
(
response
,
ex
));
.
map
(
errorFunction:
:
apply
)
exMono
=
exMono
.
onErrorResume
(
ex
->
drainBody
(
response
,
ex
));
.
orElse
(
bodyPublisher
);
return
errorFunction
.
apply
(
exMono
);
}
}
return
bodyPublisher
;
}
}
else
{
else
{
return
errorFunction
.
apply
(
createResponseException
(
response
));
return
errorFunction
.
apply
(
createResponseException
(
response
));
}
}
}
}
@SuppressWarnings
(
"unchecked"
)
private
<
T
>
Mono
<
T
>
drainBody
(
ClientResponse
response
,
Throwable
ex
)
{
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore errors in case it did consume it.
return
(
Mono
<
T
>)
response
.
bodyToMono
(
Void
.
class
).
onErrorMap
(
ex2
->
ex
).
thenReturn
(
ex
);
}
private
static
Mono
<
WebClientResponseException
>
createResponseException
(
ClientResponse
response
)
{
private
static
Mono
<
WebClientResponseException
>
createResponseException
(
ClientResponse
response
)
{
return
DataBufferUtils
.
join
(
response
.
body
(
BodyExtractors
.
toDataBuffers
()))
return
DataBufferUtils
.
join
(
response
.
body
(
BodyExtractors
.
toDataBuffers
()))
.
map
(
dataBuffer
->
{
.
map
(
dataBuffer
->
{
...
...
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
浏览文件 @
c187cb2f
...
@@ -595,6 +595,9 @@ public interface WebClient {
...
@@ -595,6 +595,9 @@ public interface WebClient {
* {@link WebClientResponseException} when the response status code is 4xx or 5xx.
* {@link WebClientResponseException} when the response status code is 4xx or 5xx.
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* applies
* applies
* <p><strong>NOTE:</strong> if the response is expected to have content,
* the exceptionFunction should consume it. If not, the content will be
* automatically drained to ensure resources are released.
* @param exceptionFunction the function that returns the exception
* @param exceptionFunction the function that returns the exception
* @return this builder
* @return this builder
*/
*/
...
...
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java
0 → 100644
浏览文件 @
c187cb2f
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.springframework.web.reactive.function.client
;
import
java.time.Duration
;
import
java.util.function.Function
;
import
io.netty.buffer.ByteBufAllocator
;
import
io.netty.channel.ChannelOption
;
import
okhttp3.mockwebserver.MockResponse
;
import
okhttp3.mockwebserver.MockWebServer
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
import
reactor.core.publisher.Mono
;
import
reactor.test.StepVerifier
;
import
org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase
;
import
org.springframework.core.io.buffer.NettyDataBufferFactory
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.client.reactive.ReactorClientHttpConnector
;
import
org.springframework.http.client.reactive.ReactorResourceFactory
;
import
static
org
.
junit
.
Assert
.*;
/**
* WebClient integration tests focusing on data buffer management.
* @author Rossen Stoyanchev
*/
public
class
WebClientDataBufferAllocatingTests
extends
AbstractDataBufferAllocatingTestCase
{
private
static
final
Duration
DELAY
=
Duration
.
ofSeconds
(
5
);
private
MockWebServer
server
;
private
WebClient
webClient
;
private
ReactorResourceFactory
factory
;
@Before
public
void
setUp
()
{
this
.
factory
=
new
ReactorResourceFactory
();
this
.
factory
.
afterPropertiesSet
();
this
.
server
=
new
MockWebServer
();
this
.
webClient
=
WebClient
.
builder
()
.
clientConnector
(
initConnector
())
.
baseUrl
(
this
.
server
.
url
(
"/"
).
toString
())
.
build
();
}
private
ReactorClientHttpConnector
initConnector
()
{
if
(
bufferFactory
instanceof
NettyDataBufferFactory
)
{
ByteBufAllocator
allocator
=
((
NettyDataBufferFactory
)
bufferFactory
).
getByteBufAllocator
();
return
new
ReactorClientHttpConnector
(
this
.
factory
,
httpClient
->
httpClient
.
tcpConfiguration
(
tcpClient
->
tcpClient
.
option
(
ChannelOption
.
ALLOCATOR
,
allocator
)));
}
else
{
return
new
ReactorClientHttpConnector
();
}
}
@After
public
void
shutDown
()
throws
InterruptedException
{
waitForDataBufferRelease
(
Duration
.
ofSeconds
(
2
));
this
.
factory
.
destroy
();
}
@Test
public
void
bodyToMonoVoid
()
{
this
.
server
.
enqueue
(
new
MockResponse
()
.
setResponseCode
(
201
)
.
setHeader
(
"Content-Type"
,
"application/json"
)
.
setChunkedBody
(
"{\"foo\" : {\"bar\" : \"123\", \"baz\" : \"456\"}}"
,
5
));
Mono
<
Void
>
mono
=
this
.
webClient
.
get
()
.
uri
(
"/json"
).
accept
(
MediaType
.
APPLICATION_JSON
)
.
retrieve
()
.
bodyToMono
(
Void
.
class
);
StepVerifier
.
create
(
mono
).
expectComplete
().
verify
(
Duration
.
ofSeconds
(
3
));
assertEquals
(
1
,
this
.
server
.
getRequestCount
());
}
@Test
public
void
onStatusWithBodyNotConsumed
()
{
RuntimeException
ex
=
new
RuntimeException
(
"response error"
);
testOnStatus
(
ex
,
response
->
Mono
.
just
(
ex
));
}
@Test
public
void
onStatusWithBodyConsumed
()
{
RuntimeException
ex
=
new
RuntimeException
(
"response error"
);
testOnStatus
(
ex
,
response
->
response
.
bodyToMono
(
Void
.
class
).
thenReturn
(
ex
));
}
@Test
// SPR-17473
public
void
onStatusWithMonoErrorAndBodyNotConsumed
()
{
RuntimeException
ex
=
new
RuntimeException
(
"response error"
);
testOnStatus
(
ex
,
response
->
Mono
.
error
(
ex
));
}
@Test
public
void
onStatusWithMonoErrorAndBodyConsumed
()
{
RuntimeException
ex
=
new
RuntimeException
(
"response error"
);
testOnStatus
(
ex
,
response
->
response
.
bodyToMono
(
Void
.
class
).
then
(
Mono
.
error
(
ex
)));
}
private
void
testOnStatus
(
Throwable
expected
,
Function
<
ClientResponse
,
Mono
<?
extends
Throwable
>>
exceptionFunction
)
{
HttpStatus
errorStatus
=
HttpStatus
.
BAD_GATEWAY
;
this
.
server
.
enqueue
(
new
MockResponse
()
.
setResponseCode
(
errorStatus
.
value
())
.
setHeader
(
"Content-Type"
,
"application/json"
)
.
setChunkedBody
(
"{\"error\" : {\"status\" : 502, \"message\" : \"Bad gateway.\"}}"
,
5
));
Mono
<
String
>
mono
=
this
.
webClient
.
get
()
.
uri
(
"/json"
).
accept
(
MediaType
.
APPLICATION_JSON
)
.
retrieve
()
.
onStatus
(
status
->
status
.
equals
(
errorStatus
),
exceptionFunction
)
.
bodyToMono
(
String
.
class
);
StepVerifier
.
create
(
mono
).
expectErrorSatisfies
(
actual
->
assertSame
(
expected
,
actual
)).
verify
(
DELAY
);
assertEquals
(
1
,
this
.
server
.
getRequestCount
());
}
}
src/docs/asciidoc/web/webflux-webclient.adoc
浏览文件 @
c187cb2f
...
@@ -294,6 +294,10 @@ as the following example shows:
...
@@ -294,6 +294,10 @@ as the following example shows:
----
----
====
====
When `onStatus` is used, if the response is expected to have content, then the `onStatus`
callback should consume it. If not, the content will be automatically drained to ensure
resources are released.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录