提交 45d04056 编写于 作者: R Rossen Stoyanchev

Upgrade to RSocket 1.0 RC3 snapshots and...

take advantage of the symmetrical SocketAcceptor methods now available
on RSocketFactory for both client and server side.
上级 a0d50a54
......@@ -30,7 +30,7 @@ ext {
log4jVersion = "2.12.0"
nettyVersion = "4.1.38.Final"
reactorVersion = "Dysprosium-M3"
rsocketVersion = "1.0.0-RC2"
rsocketVersion = "1.0.0-RC3-SNAPSHOT"
rxjavaVersion = "1.3.8"
rxjavaAdapterVersion = "1.2.1"
rxjava2Version = "2.2.10"
......@@ -118,6 +118,7 @@ configure(allprojects.findAll { (it.name != "framework-bom") } ) { project ->
mavenCentral()
maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" } // Reactor
maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket
mavenLocal()
}
......
......@@ -19,8 +19,6 @@ package org.springframework.messaging.rsocket.annotation.support;
import java.lang.reflect.AnnotatedElement;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
......@@ -53,18 +51,15 @@ import org.springframework.util.RouteMatcher;
import org.springframework.util.StringUtils;
/**
* Extension of {@link MessageMappingMessageHandler} for use in RSocket as a
* responder that handles requests with {@link ConnectMapping @ConnectMapping}
* and {@link MessageMapping @MessageMapping} methods.
* <p>For RSocket servers use {@link #serverResponder()} to obtain a
* {@link SocketAcceptor} to register with
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory ServerRSocketFactory}.
* <p>For RSocket clients use {@link #clientResponder()} to obtain an adapter
* to register with
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory},
* or use the static shortcut
* {@link #clientResponder(RSocketStrategies, Object...)} to obtain a configurer
* for {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory}.
* Extension of {@link MessageMappingMessageHandler} for handling RSocket
* requests with {@link ConnectMapping @ConnectMapping} and
* {@link MessageMapping @MessageMapping} methods.
* <p>Use {@link #responder()} to obtain a {@link SocketAcceptor} adapter to
* plug in as responder into an {@link io.rsocket.RSocketFactory}.
* <p>Use {@link #clientResponder(RSocketStrategies, Object...)} to obtain a
* client responder configurer
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory
* RSocketRequester}.
*
* @author Rossen Stoyanchev
* @since 5.2
......@@ -313,10 +308,11 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
}
/**
* Return an adapter for a server side
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor)
* acceptor} that delegate to this {@link RSocketMessageHandler} for
* handling.
* Return an adapter for a {@link SocketAcceptor} that delegates to this
* {@code RSocketMessageHandler} instance. The adapter can be plugged in as a
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(SocketAcceptor) client} or
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
* side responder.
* <p>The initial {@link ConnectionSetupPayload} can be handled with a
* {@link ConnectMapping @ConnectionMapping} method which can be asynchronous
* and return {@code Mono<Void>} with an error signal preventing the
......@@ -325,7 +321,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* <p>Subsequent requests on the connection can be handled with
* {@link MessageMapping MessageMapping} methods.
*/
public SocketAcceptor serverResponder() {
public SocketAcceptor responder() {
return (setupPayload, sendingRSocket) -> {
MessagingRSocket responder;
try {
......@@ -338,25 +334,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
};
}
/**
* Return an adapter for a client side responder that can be used to set
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function)}.
* The responder delegates requests to this {@code RSocketMessageHandler}
* for handling via {@code @MessageMapping} methods.
* <p>The initial {@link ConnectionSetupPayload} can be accessed through a
* {@link ConnectMapping @ConnectionMapping} method, but such a method is
* only a callback just before the connection is made and cannot "accept"
* or prevent the connection. Such a method can also start requests to the
* server but must do so decoupled from handling and the current thread.
*/
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientResponder() {
return (setupPayload, sendingRSocket) -> {
MessagingRSocket responder = createResponder(setupPayload, sendingRSocket);
responder.handleConnectionSetupPayload(setupPayload).subscribe();
return responder;
};
}
private MessagingRSocket createResponder(ConnectionSetupPayload setupPayload, RSocket rsocket) {
String str = setupPayload.dataMimeType();
MimeType dataMimeType = StringUtils.hasText(str) ? MimeTypeUtils.parseMimeType(str) : this.defaultDataMimeType;
......@@ -389,7 +366,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}.
* <p>In effect a shortcut to create and initialize
* {@code RSocketMessageHandler} with the given strategies and handlers,
* and use {@link #clientResponder()} to obtain the responder.
* use {@link #responder()} to obtain the responder, and plug that into
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory}.
* For more advanced scenarios, e.g. discovering handlers through a custom
* stereotype annotation, consider declaring {@code RSocketMessageHandler}
* as a bean, and then obtain the responder from it.
......@@ -415,7 +393,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
handler.setHandlers(handlers);
handler.setRSocketStrategies(strategies);
handler.afterPropertiesSet();
rsocketFactory.acceptor(handler.clientResponder());
rsocketFactory.acceptor(handler.responder());
};
}
}
......@@ -76,7 +76,7 @@ public class RSocketBufferLeakTests {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
......
......@@ -63,7 +63,7 @@ public class RSocketClientToServerIntegrationTests {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.addResponderPlugin(interceptor)
......
......@@ -60,7 +60,7 @@ public class RSocketServerToClientIntegrationTests {
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
......
......@@ -202,7 +202,7 @@ class RSocketClientToServerCoroutinesIntegrationTests {
server = RSocketFactory.receive()
.addResponderPlugin(interceptor)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler::class.java).serverResponder())
.acceptor(context.getBean(RSocketMessageHandler::class.java).responder())
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block()!!
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册