提交 b7990135 编写于 作者: B Brian Clozel

Update to Reactor Aluminium SNAPSHOT

Currently the BOM versions are:

* reactor-core 3.0.6.BUILD-SNAPSHOT
* reactor-netty 0.6.2.BUILD-SNAPSHOT

This commit fixes as well a few deprecations in reactor-core.
上级 c8635de3
......@@ -77,7 +77,7 @@ configure(allprojects) { project ->
ext.poiVersion = "3.15"
ext.protobufVersion = "3.2.0"
ext.quartzVersion = "2.2.3"
ext.reactorVersion = "Aluminium-SR1"
ext.reactorVersion = "Aluminium-BUILD-SNAPSHOT"
ext.romeVersion = "1.7.1"
ext.rxjavaVersion = '1.2.7'
ext.rxjavaAdapterVersion = '1.2.1'
......@@ -178,6 +178,7 @@ configure(allprojects) { project ->
repositories {
maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/libs-snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
......
......@@ -113,7 +113,7 @@ public abstract class DataBufferUtils {
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
return Flux.create(emitter -> {
emitter.setCancellation(() -> closeChannel(channel));
emitter.onDispose(() -> closeChannel(channel));
AsynchronousFileChannelCompletionHandler completionHandler =
new AsynchronousFileChannelCompletionHandler(emitter, position,
dataBufferFactory, byteBuffer);
......
......@@ -15,6 +15,7 @@
*/
package org.springframework.core.convert.support;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
......@@ -109,7 +110,7 @@ public class ReactiveAdapterRegistryTests {
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
assertTrue(target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().blockMillis(1000));
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
// TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)?
......@@ -119,7 +120,7 @@ public class ReactiveAdapterRegistryTests {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertTrue(target instanceof Mono);
assertEquals(new Integer(1), ((Mono<Integer>) target).blockMillis(1000));
assertEquals(new Integer(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
@Test
......@@ -195,7 +196,7 @@ public class ReactiveAdapterRegistryTests {
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().blockMillis(1000));
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
@Test
......@@ -203,7 +204,7 @@ public class ReactiveAdapterRegistryTests {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
assertEquals(new Integer(1), ((Mono<Integer>) target).blockMillis(1000));
assertEquals(new Integer(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
@Test
......@@ -211,7 +212,7 @@ public class ReactiveAdapterRegistryTests {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
((Mono<Void>) target).blockMillis(1000);
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
@Test
......@@ -220,7 +221,7 @@ public class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().blockMillis(1000));
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
@Test
......@@ -229,7 +230,7 @@ public class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().blockMillis(1000));
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
@Test
......@@ -237,7 +238,7 @@ public class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
assertEquals(new Integer(1), ((Mono<Integer>) target).blockMillis(1000));
assertEquals(new Integer(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
@Test
......@@ -245,7 +246,7 @@ public class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
((Mono<Void>) target).blockMillis(1000);
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
@Test
......@@ -254,7 +255,7 @@ public class ReactiveAdapterRegistryTests {
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);
assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
assertEquals(new Integer(1), ((Mono<Integer>) target).blockMillis(1000));
assertEquals(new Integer(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
......
......@@ -17,6 +17,7 @@
package org.springframework.messaging.tcp.reactor;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
......@@ -175,7 +176,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt)));
.flatMap(attempt -> Mono.delay(
Duration.ofMillis(reconnectStrategy.getTimeToNextAttempt(attempt))));
}
@Override
......
......@@ -187,7 +187,7 @@ public class ExchangeResult {
private String formatBody(MediaType contentType, MonoProcessor<byte[]> body) {
if (body.isSuccess()) {
byte[] bytes = body.blockMillis(0);
byte[] bytes = body.block(Duration.ZERO);
if (bytes.length == 0) {
return "No content";
}
......
......@@ -67,7 +67,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
Mono<ServerSentEvent<String>> source = Mono.just(event);
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000);
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000));
StepVerifier.create(outputMessage.getBodyAsString())
.expectNext("id:c42\n" +
......@@ -86,7 +86,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
Flux<String> source = Flux.just("foo", "bar");
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000);
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000));
StepVerifier.create(outputMessage.getBodyAsString())
.expectNext("data:foo\n\ndata:bar\n\n")
......@@ -99,7 +99,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).blockMillis(5000);
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000));
StepVerifier.create(outputMessage.getBodyAsString())
.expectNext("data:foo\n" +
......@@ -116,7 +116,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
new Pojo("foofoofoo", "barbarbar"));
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
messageWriter.write(source, ResolvableType.forClass(Pojo.class),
MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).blockMillis(5000);
MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000));
StepVerifier.create(outputMessage.getBodyAsString())
.expectNext("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n\n" +
......@@ -135,7 +135,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
new Pojo("foofoofoo", "barbarbar"));
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
messageWriter.write(source, ResolvableType.forClass(Pojo.class),
MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).blockMillis(5000);
MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofMillis(5000));
StepVerifier.create(outputMessage.getBodyAsString())
.expectNext("data:{\n" +
......
......@@ -19,6 +19,7 @@ package org.springframework.web.bind.support;
import java.beans.PropertyEditorSupport;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
import java.util.Iterator;
import org.junit.Before;
......@@ -64,7 +65,7 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("spouse", "someValue");
formData.add("spouse.name", "test");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertNotNull(this.testBean.getSpouse());
assertEquals("test", testBean.getSpouse().getName());
......@@ -75,11 +76,11 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("_postProcessed", "visible");
formData.add("postProcessed", "on");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertTrue(this.testBean.isPostProcessed());
formData.remove("postProcessed");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertFalse(this.testBean.isPostProcessed());
}
......@@ -90,11 +91,11 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("_postProcessed", "visible");
formData.add("postProcessed", "on");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertTrue(this.testBean.isPostProcessed());
formData.remove("postProcessed");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertFalse(this.testBean.isPostProcessed());
}
......@@ -103,11 +104,11 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("!postProcessed", "off");
formData.add("postProcessed", "on");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertTrue(this.testBean.isPostProcessed());
formData.remove("postProcessed");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertFalse(this.testBean.isPostProcessed());
}
......@@ -117,15 +118,15 @@ public class WebExchangeDataBinderTests {
formData.add("!postProcessed", "on");
formData.add("_postProcessed", "visible");
formData.add("postProcessed", "on");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertTrue(this.testBean.isPostProcessed());
formData.remove("postProcessed");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertTrue(this.testBean.isPostProcessed());
formData.remove("!postProcessed");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertFalse(this.testBean.isPostProcessed());
}
......@@ -134,11 +135,11 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("!name", "anonymous");
formData.add("name", "Scott");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertEquals("Scott", this.testBean.getName());
formData.remove("name");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertEquals("anonymous", this.testBean.getName());
}
......@@ -148,12 +149,12 @@ public class WebExchangeDataBinderTests {
formData.add("stringArray", "bar");
formData.add("stringArray", "abc");
formData.add("stringArray", "123,def");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertEquals("Expected all three items to be bound", 3, this.testBean.getStringArray().length);
formData.remove("stringArray");
formData.add("stringArray", "123,def");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertEquals("Expected only 1 item to be bound", 1, this.testBean.getStringArray().length);
}
......@@ -162,7 +163,7 @@ public class WebExchangeDataBinderTests {
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("spouse.name", "test");
formData.add("spouse", "someValue");
this.binder.bind(exchange(formData)).blockMillis(5000);
this.binder.bind(exchange(formData)).block(Duration.ofMillis(5000));
assertNotNull(this.testBean.getSpouse());
assertEquals("test", this.testBean.getSpouse().getName());
......@@ -173,7 +174,7 @@ public class WebExchangeDataBinderTests {
String url = "/path?spouse=someValue&spouse.name=test";
MockServerHttpRequest request = MockServerHttpRequest.post(url).build();
ServerWebExchange exchange = new DefaultServerWebExchange(request, new MockServerHttpResponse());
this.binder.bind(exchange).blockMillis(5000);
this.binder.bind(exchange).block(Duration.ofMillis(5000));
assertNotNull(this.testBean.getSpouse());
assertEquals("test", this.testBean.getSpouse().getName());
......
......@@ -16,6 +16,7 @@
package org.springframework.web.reactive.resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -87,7 +88,7 @@ public class AppCacheManifestTransformerTests {
given(resource.getFilename()).willReturn("foobar.file");
given(this.chain.transform(exchange, resource)).willReturn(Mono.just(resource));
Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000);
Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000));
assertEquals(resource, result);
}
......@@ -98,7 +99,7 @@ public class AppCacheManifestTransformerTests {
Resource resource = new ClassPathResource("test/error.appcache", getClass());
given(this.chain.transform(exchange, resource)).willReturn(Mono.just(resource));
Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000);
Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000));
assertEquals(resource, result);
}
......@@ -119,7 +120,7 @@ public class AppCacheManifestTransformerTests {
this.chain = new DefaultResourceTransformerChain(resolverChain, transformers);
Resource resource = new ClassPathResource("test/test.appcache", getClass());
Resource result = this.transformer.transform(exchange, resource, this.chain).blockMillis(5000);
Resource result = this.transformer.transform(exchange, resource, this.chain).block(Duration.ofMillis(5000));
byte[] bytes = FileCopyUtils.copyToByteArray(result.getInputStream());
String content = new String(bytes, "UTF-8");
......
......@@ -195,17 +195,17 @@ public class ViewResolutionResultHandlerTests {
this.request = MockServerHttpRequest.get("/account").build();
ServerWebExchange exchange = createExchange();
handler.handleResult(exchange, result).blockMillis(5000);
handler.handleResult(exchange, result).block(Duration.ofMillis(5000));
assertResponseBody(exchange, "account: {id=123}");
this.request = MockServerHttpRequest.get("/account/").build();
exchange = createExchange();
handler.handleResult(exchange, result).blockMillis(5000);
handler.handleResult(exchange, result).block(Duration.ofMillis(5000));
assertResponseBody(exchange, "account: {id=123}");
this.request = MockServerHttpRequest.get("/account.123").build();
exchange = createExchange();
handler.handleResult(exchange, result).blockMillis(5000);
handler.handleResult(exchange, result).block(Duration.ofMillis(5000));
assertResponseBody(exchange, "account: {id=123}");
}
......@@ -281,7 +281,7 @@ public class ViewResolutionResultHandlerTests {
this.request = MockServerHttpRequest.get("/account").build();
ServerWebExchange exchange = createExchange();
handler.handleResult(exchange, result).blockMillis(5000);
handler.handleResult(exchange, result).block(Duration.ofMillis(5000));
assertResponseBody(exchange, "account: {" +
"attr1=TestBean[name=Bean1], " +
"attr2=[TestBean[name=Bean1], TestBean[name=Bean2]], " +
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册