提交 6b3d5f1b 编写于 作者: S Sebastien Deleuze

Turn FlushingDataBuffer to an empty DataBuffer

上级 3c80c19c
...@@ -21,26 +21,25 @@ import java.io.OutputStream; ...@@ -21,26 +21,25 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;
import org.springframework.util.Assert;
/** /**
* {@link DataBuffer} wrapper that indicates the file or the socket writing this buffer * Empty {@link DataBuffer} that indicates to the file or the socket writing it that
* should be flushed. * previously buffered data should be flushed.
* *
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @see FlushingDataBuffer#INSTANCE
*/ */
public class FlushingDataBuffer implements DataBuffer { public class FlushingDataBuffer implements DataBuffer {
/** Singleton instance of this class */
public static final FlushingDataBuffer INSTANCE = new FlushingDataBuffer();
private final DataBuffer buffer; private final DataBuffer buffer;
public FlushingDataBuffer() {
private FlushingDataBuffer() {
this.buffer = new DefaultDataBufferFactory().allocateBuffer(0); this.buffer = new DefaultDataBufferFactory().allocateBuffer(0);
} }
public FlushingDataBuffer(DataBuffer buffer) {
Assert.notNull(buffer);
this.buffer = buffer;
}
@Override @Override
public DataBufferFactory factory() { public DataBufferFactory factory() {
......
...@@ -115,7 +115,8 @@ public class SseEventEncoder extends AbstractEncoder<Object> { ...@@ -115,7 +115,8 @@ public class SseEventEncoder extends AbstractEncoder<Object> {
return Flux.concat( return Flux.concat(
encodeString(sb.toString(), bufferFactory), encodeString(sb.toString(), bufferFactory),
dataBuffer, dataBuffer,
encodeString("\n", bufferFactory).map(b -> new FlushingDataBuffer(b)) encodeString("\n", bufferFactory),
Mono.just(FlushingDataBuffer.INSTANCE)
); );
}); });
......
...@@ -18,6 +18,7 @@ package org.springframework.core.codec.support; ...@@ -18,6 +18,7 @@ package org.springframework.core.codec.support;
import java.util.Arrays; import java.util.Arrays;
import static org.junit.Assert.*;
import org.junit.Test; import org.junit.Test;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
...@@ -26,14 +27,11 @@ import reactor.core.test.TestSubscriber; ...@@ -26,14 +27,11 @@ import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;
import org.springframework.http.codec.SseEventEncoder; import org.springframework.http.codec.SseEventEncoder;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.web.reactive.sse.SseEvent; import org.springframework.web.reactive.sse.SseEvent;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** /**
* @author Sebastien Deleuze * @author Sebastien Deleuze
*/ */
...@@ -77,7 +75,8 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { ...@@ -77,7 +75,8 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
"event:foo\n" + "event:foo\n" +
"retry:123\n" + "retry:123\n" +
":bla\n:bla bla\n:bla bla bla\n"), ":bla\n:bla bla\n:bla bla bla\n"),
stringConsumer("\n") stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
); );
} }
...@@ -93,8 +92,10 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { ...@@ -93,8 +92,10 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
.assertValuesWith( .assertValuesWith(
stringConsumer("data:foo\n"), stringConsumer("data:foo\n"),
stringConsumer("\n"), stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
stringConsumer("data:bar\n"), stringConsumer("data:bar\n"),
stringConsumer("\n") stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
); );
} }
...@@ -110,12 +111,13 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { ...@@ -110,12 +111,13 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
.assertValuesWith( .assertValuesWith(
stringConsumer("data:foo\ndata:bar\n"), stringConsumer("data:foo\ndata:bar\n"),
stringConsumer("\n"), stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
stringConsumer("data:foo\ndata:baz\n"), stringConsumer("data:foo\ndata:baz\n"),
stringConsumer("\n") stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
); );
} }
@Test @Test
public void encodePojo() { public void encodePojo() {
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
...@@ -130,10 +132,12 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { ...@@ -130,10 +132,12 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"), stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"),
stringConsumer("\n"), stringConsumer("\n"),
stringConsumer("\n"), stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
stringConsumer("data:"), stringConsumer("data:"),
stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"), stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"),
stringConsumer("\n"), stringConsumer("\n"),
stringConsumer("\n") stringConsumer("\n"),
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
); );
} }
......
...@@ -70,14 +70,15 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest ...@@ -70,14 +70,15 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Flux<DataBuffer> responseBody = Flux Flux<DataBuffer> responseBody = Flux
.interval(50) .interval(50)
.take(2)
.concatWith(Flux.never())
.map(l -> { .map(l -> {
byte[] data = ("data" + l).getBytes(); byte[] data = ("data" + l).getBytes();
DataBuffer buffer = response.bufferFactory().allocateBuffer(data.length); DataBuffer buffer = response.bufferFactory().allocateBuffer(data.length);
buffer.write(data); buffer.write(data);
return new FlushingDataBuffer(buffer); return buffer;
}); })
.take(2)
.concatWith(Mono.just(FlushingDataBuffer.INSTANCE))
.concatWith(Flux.never());
return response.writeWith(responseBody); return response.writeWith(responseBody);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册