提交 d2a7d9fa 编写于 作者: R Rossen Stoyanchev

Introduce RSocketClientFactoryConfigurer

The new interface supersedes ClientResponderFactory and is more general,
for any RSocketFactory customization.

DefaultClientResponderFactory implements the new interface and is
renamed to AnnotationClientResponderConfigurer.

See gh-23170
上级 1e9ccdd8
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import io.rsocket.RSocketFactory;
/**
* Strategy to apply some configuration to a
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory}.
* It is given to {@link RSocketRequester.Builder} to initialize the
* {@code RSocketFactory} that's used to connect.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
@FunctionalInterface
public interface ClientRSocketFactoryConfigurer {
/**
* This method is invoked by {@link RSocketRequester.Builder} immediately
* before the call to {@link #configure}, and can be used by implementations
* of this interface that need access to the configured
* {@code RSocketStrategies}.
*/
default void configureWithStrategies(RSocketStrategies strategies) {
}
/**
* Configure the given {@code ClientRSocketFactory}.
* @param rsocketFactory the factory to configure
*/
void configure(RSocketFactory.ClientRSocketFactory rsocketFactory);
}
......@@ -47,13 +47,13 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private MimeType metadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA;
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
@Nullable
private RSocketStrategies strategies;
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
private List<ClientRSocketFactoryConfigurer> rsocketFactoryConfigurers = new ArrayList<>();
@Override
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
......@@ -68,12 +68,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this;
}
@Override
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
this.factoryConfigurers.add(configurer);
return this;
}
@Override
public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) {
this.strategies = strategies;
......@@ -86,6 +80,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this;
}
@Override
public RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer) {
this.rsocketFactoryConfigurers.add(configurer);
return this;
}
@Override
public Mono<RSocketRequester> connectTcp(String host, int port) {
return connect(TcpClientTransport.create(host, port));
......@@ -110,9 +110,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
this.rsocketFactoryConfigurers.forEach(configurer -> {
configurer.configureWithStrategies(rsocketStrategies);
configurer.configure(rsocketFactory);
});
return rsocketFactory.transport(transport)
.start()
......
......@@ -21,7 +21,6 @@ import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ClientTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
......@@ -30,6 +29,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
import org.springframework.util.MimeType;
/**
......@@ -141,18 +141,6 @@ public interface RSocketRequester {
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Configure the {@code ClientRSocketFactory}.
* <p><strong>Note:</strong> This builder provides shortcuts for certain
* {@code ClientRSocketFactory} options it needs to know about such as
* {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}.
* Please, use these shortcuts vs configuring them directly on the
* {@code ClientRSocketFactory} so that the resulting
* {@code RSocketRequester} is aware of those changes.
* @param configurer consumer to customize the factory
*/
RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer);
/**
* Set the {@link RSocketStrategies} to use for access to encoders,
* decoders, and a factory for {@code DataBuffer's}.
......@@ -169,6 +157,20 @@ public interface RSocketRequester {
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <p>See {@link AnnotationClientResponderConfigurer} for configuring a
* client side responder.
* <p><strong>Note:</strong> Do not set {@link #dataMimeType(MimeType)}
* and {@link #metadataMimeType(MimeType)} directly on the
* {@code ClientRSocketFactory}. Use the shortcuts on this builder
* instead since the created {@code RSocketRequester} needs to be aware
* of those settings.
* @param configurer consumer to customize the factory
* @see AnnotationClientResponderConfigurer
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
/**
* Connect to the RSocket server over TCP.
* @param host the server host
......
......@@ -13,34 +13,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket.annotation.support;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import io.rsocket.RSocketFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer;
import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.RouteMatcher;
/**
* Default implementation of {@link ClientResponderFactory}.
* {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder
* that handles requests via annotated handler methods. Effectively a thin layer over
* {@link RSocketMessageHandler} that provides a programmatic way to configure
* it and obtain a responder via {@link RSocketMessageHandler#clientResponder()
* clientResponder()}.
*
* @author Brian Clozel
* @author Rossen Stoyanchev
* @since 5.2
*/
class DefaultClientResponderFactory implements ClientResponderFactory, ClientResponderFactory.Config {
@Nullable
private List<Object> handlers;
public final class AnnotationClientResponderConfigurer implements ClientRSocketFactoryConfigurer {
@Nullable
private RSocketStrategies strategies;
private final List<Object> handlers = new ArrayList<>();
@Nullable
private RouteMatcher routeMatcher;
......@@ -49,72 +51,83 @@ class DefaultClientResponderFactory implements ClientResponderFactory, ClientRes
private MetadataExtractor extractor;
@Nullable
private ReturnValueHandlerConfigurer returnValueHandlerConfigurer;
@Nullable
private ArgumentResolverConfigurer argumentResolverConfigurer;
private RSocketStrategies strategies;
@Override
public ClientResponderFactory handlers(Object... handlers) {
Assert.notEmpty(handlers, "Handlers array must not be empty");
this.handlers = Arrays.asList(handlers);
return this;
private AnnotationClientResponderConfigurer(List<Object> handlers) {
for (Object obj : handlers) {
this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
}
}
@Override
public ClientResponderFactory.Config strategies(RSocketStrategies strategies) {
this.strategies = strategies;
return this;
}
@Override
public ClientResponderFactory.Config routeMatcher(RouteMatcher routeMatcher) {
/**
* Configure the {@link RouteMatcher} to use. This is used to set
* {@link RSocketMessageHandler#setRouteMatcher(RouteMatcher)}.
*/
public AnnotationClientResponderConfigurer routeMatcher(RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
@Override
public ClientResponderFactory.Config metadataExtractor(MetadataExtractor extractor) {
/**
* Configure the {@link MetadataExtractor} to use. This is used to set
* {@link RSocketMessageHandler#setMetadataExtractor(MetadataExtractor)}.
*/
public AnnotationClientResponderConfigurer metadataExtractor(MetadataExtractor extractor) {
this.extractor = extractor;
return this;
}
@Override
public ClientResponderFactory.Config returnValueHandler(ReturnValueHandlerConfigurer configurer) {
this.returnValueHandlerConfigurer = configurer;
/**
* Configure handlers to detect {@code @MessasgeMapping} handler methods on.
* This is used to set {@link RSocketMessageHandler#setHandlers(List)}.
*/
public AnnotationClientResponderConfigurer handlers(Object... handlers) {
this.handlers.addAll(Arrays.asList(handlers));
return this;
}
// Implementation of ClientRSocketFactoryConfigurer
@Override
public ClientResponderFactory.Config argumentResolver(ArgumentResolverConfigurer configurer) {
this.argumentResolverConfigurer = configurer;
return this;
public void configureWithStrategies(RSocketStrategies strategies) {
this.strategies = strategies;
}
@Override
public void accept(RSocketFactory.ClientRSocketFactory clientRSocketFactory) {
Assert.state(this.handlers != null, "No handlers set");
public void configure(RSocketFactory.ClientRSocketFactory factory) {
Assert.notEmpty(this.handlers, "No handlers");
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers);
if (this.strategies != null) {
messageHandler.setRSocketStrategies(this.strategies);
}
if (this.routeMatcher != null) {
messageHandler.setRouteMatcher(this.routeMatcher);
}
if (this.extractor != null) {
messageHandler.setMetadataExtractor(this.extractor);
}
if (this.returnValueHandlerConfigurer != null) {
messageHandler.setReturnValueHandlerConfigurer(this.returnValueHandlerConfigurer);
}
if (this.argumentResolverConfigurer != null) {
messageHandler.setArgumentResolverConfigurer(this.argumentResolverConfigurer);
}
messageHandler.setRSocketStrategies(this.strategies);
messageHandler.afterPropertiesSet();
clientRSocketFactory.acceptor(messageHandler.clientResponder());
factory.acceptor(messageHandler.clientResponder());
}
// Static factory methods
/**
* Create an {@code AnnotationClientResponderConfigurer} with the given handlers
* to check for {@code @MessasgeMapping} handler methods.
*/
public static AnnotationClientResponderConfigurer withHandlers(Object... handlers) {
return new AnnotationClientResponderConfigurer(Arrays.asList(handlers));
}
/**
* Create an {@code AnnotationClientResponderConfigurer} to set up further.
*/
public static AnnotationClientResponderConfigurer create() {
return new AnnotationClientResponderConfigurer(Collections.emptyList());
}
}
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket.annotation.support;
import java.util.function.Consumer;
import io.rsocket.RSocketFactory;
import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer;
import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.RouteMatcher;
/**
* Build and configure a responder on a {@link RSocketFactory.ClientRSocketFactory} in order
* to handle requests sent by the RSocket server to the client.
* <p>This can be configured as a responder on a {@link org.springframework.messaging.rsocket.RSocketRequester}
* being built by passing it as an argument to the
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory} method.
*
* @author Brian Clozel
* @since 5.2
* @see org.springframework.messaging.rsocket.RSocketRequester
*/
public interface ClientResponderFactory extends Consumer<RSocketFactory.ClientRSocketFactory> {
/**
* Create a new {@link ClientResponderFactory.Config} for handling requests with annotated handlers.
*/
static ClientResponderFactory.Config create() {
return new DefaultClientResponderFactory();
}
/**
* Configure the client responder with infrastructure options
* to be applied on the resulting {@link RSocketMessageHandler}.
*/
interface Config {
/**
* Set the {@link RSocketStrategies} to use for access to encoders,
* decoders, and a factory for {@code DataBuffer's}.
* @param strategies the codecs strategies to use
*/
Config strategies(RSocketStrategies strategies);
/**
* Set the {@link RouteMatcher} to use for matching incoming requests.
* <p>If none is set, then the responder will use a default
* {@link org.springframework.util.SimpleRouteMatcher} instance backed
* by and {@link org.springframework.util.AntPathMatcher}.
* @param routeMatcher the route matcher to use with the responder
*/
Config routeMatcher(RouteMatcher routeMatcher);
/**
* Set the {@link MetadataExtractor} to use for extracting information
* from metadata frames.
* @param extractor the metadata extractor to use
*/
Config metadataExtractor(MetadataExtractor extractor);
/**
* Set the {@link ReturnValueHandlerConfigurer} for configuring
* return value handlers.
* @param configurer the configurer to use
*/
Config returnValueHandler(ReturnValueHandlerConfigurer configurer);
/**
* Set the {@link ArgumentResolverConfigurer} for configuring
* argument resolvers.
* @param configurer the configurer to use
*/
Config argumentResolver(ArgumentResolverConfigurer configurer);
/**
* Set the annotated handlers in charge of processing the incoming RSocket requests.
* @param handlers the annotated handlers
*/
ClientResponderFactory handlers(Object... handlers);
}
}
......@@ -63,7 +63,7 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(factoryConfigurer)
......@@ -79,7 +79,7 @@ public class DefaultRSocketRequesterBuilderTests {
.encoder(CharSequenceEncoder.allMimeTypes())
.decoder(StringDecoder.allMimeTypes())
.build();
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketStrategies(strategies)
......@@ -88,7 +88,8 @@ public class DefaultRSocketRequesterBuilderTests {
.connect(this.transport)
.block();
verify(this.transport).connect(anyInt());
verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));
verify(factoryConfigurer).configureWithStrategies(any(RSocketStrategies.class));
verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class));
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
}
......
......@@ -42,7 +42,7 @@ import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.ClientResponderFactory;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
......@@ -103,22 +103,21 @@ public class RSocketServerToClientIntegrationTests {
ServerController serverController = context.getBean(ServerController.class);
serverController.reset();
RSocketStrategies rSocketStrategies = context.getBean(RSocketStrategies.class);
ClientResponderFactory clientResponder = ClientResponderFactory.create()
.strategies(rSocketStrategies)
.handlers(new ClientHandler());
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
RSocketRequester requester = null;
try {
ClientRSocketFactoryConfigurer responderConfigurer =
AnnotationClientResponderConfigurer.withHandlers(new ClientHandler());
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.metadataMimeType("text/plain");
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
})
.rsocketFactory(clientResponder)
.rsocketStrategies(rSocketStrategies)
.rsocketFactory(responderConfigurer)
.rsocketStrategies(strategies)
.connectTcp("localhost", server.address().getPort())
.block();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册