提交 20742c74 编写于 作者: R Rossen Stoyanchev

Merge branch '5.1.x'

/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -28,6 +28,8 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
......@@ -279,13 +281,20 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
}
private boolean emitCachedSignals() {
if (this.item != null) {
requiredWriteSubscriber().onNext(this.item);
}
if (this.error != null) {
requiredWriteSubscriber().onError(this.error);
try {
requiredWriteSubscriber().onError(this.error);
}
finally {
releaseCachedItem();
}
return true;
}
T item = this.item;
this.item = null;
if (item != null) {
requiredWriteSubscriber().onNext(item);
}
if (this.completed) {
requiredWriteSubscriber().onComplete();
return true;
......@@ -298,7 +307,22 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
Subscription s = this.subscription;
if (s != null) {
this.subscription = null;
s.cancel();
try {
s.cancel();
}
finally {
releaseCachedItem();
}
}
}
private void releaseCachedItem() {
synchronized (this) {
Object item = this.item;
if (item instanceof DataBuffer) {
DataBufferUtils.release((DataBuffer) item);
}
this.item = null;
}
}
......
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,25 +16,29 @@
package org.springframework.http.server.reactive;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import static org.junit.Assert.*;
/**
* @author Rossen Stoyanchev
......@@ -50,9 +54,6 @@ public class ChannelSendOperatorTests {
this.writer = new OneByOneAsyncWriter();
}
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);
}
@Test
public void errorBeforeFirstItem() throws Exception {
......@@ -130,6 +131,66 @@ public class ChannelSendOperatorTests {
assertSame(error, this.writer.error);
}
@Test // gh-22720
public void cancelWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Mono.fromCallable(() -> {
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
dataBuffer.write("foo", StandardCharsets.UTF_8);
return dataBuffer;
}),
publisher -> {
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
publisher.subscribe(subscriber);
return Mono.never();
});
BaseSubscriber<Void> subscriber = new BaseSubscriber<Void>() {};
operator.subscribe(subscriber);
subscriber.cancel();
bufferFactory.checkForLeaks();
}
@Test // gh-22720
public void errorWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Flux.create(sink -> {
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
dataBuffer.write("foo", StandardCharsets.UTF_8);
sink.next(dataBuffer);
sink.error(new IllegalStateException("err"));
}),
publisher -> {
publisher.subscribe(writeSubscriber);
return Mono.never();
});
operator.subscribe(new BaseSubscriber<Void>() {});
try {
writeSubscriber.signalDemand(1); // Let cached signals ("foo" and error) be published..
}
catch (Throwable ex) {
assertNotNull(ex.getCause());
assertEquals("err", ex.getCause().getMessage());
}
bufferFactory.checkForLeaks();
}
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);
}
private static class OneByOneAsyncWriter {
......@@ -182,4 +243,18 @@ public class ChannelSendOperatorTests {
}
}
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Just subscribe without requesting
}
public void signalDemand(long demand) {
upstream().request(demand);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册