提交 85c93f5d 编写于 作者: R Rossen Stoyanchev

Polish Reactor Netty TCP client support

上级 870f61fd
......@@ -29,11 +29,10 @@ import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A STOMP over TCP client that uses
* {@link ReactorNettyTcpClient}.
* A STOMP over TCP client that uses {@link ReactorNettyTcpClient}.
*
* @author Rossen Stoyanchev
* @since 4.2
* @since 5.0
*/
public class ReactorNettyTcpStompClient extends StompClientSupport {
......@@ -99,25 +98,21 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
* Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for
* encoding, decoding and hand-off.
*
* @param relayHost target host
* @param relayPort target port
* @param host target host
* @param port target port
* @param decoder {@link StompDecoder} to use
* @return a new {@link TcpOperations}
*/
protected static TcpOperations<byte[]> create(String relayHost,
int relayPort,
StompDecoder decoder) {
return new ReactorNettyTcpClient<>(relayHost,
relayPort,
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction(
decoder),
protected static TcpOperations<byte[]> create(String host, int port, StompDecoder decoder) {
return new ReactorNettyTcpClient<>(host, port,
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(
new DecodingFunction(decoder),
new EncodingConsumer(new StompEncoder()),
128,
Schedulers.newParallel("StompClient")));
}
private static final class EncodingConsumer
implements BiConsumer<ByteBuf, Message<byte[]>> {
private static final class EncodingConsumer implements BiConsumer<ByteBuf, Message<byte[]>> {
private final StompEncoder encoder;
......@@ -127,12 +122,11 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
@Override
public void accept(ByteBuf byteBuf, Message<byte[]> message) {
byteBuf.writeBytes(encoder.encode(message));
byteBuf.writeBytes(this.encoder.encode(message));
}
}
private static final class DecodingFunction
implements Function<ByteBuf, List<Message<byte[]>>> {
private static final class DecodingFunction implements Function<ByteBuf, List<Message<byte[]>>> {
private final StompDecoder decoder;
......
......@@ -17,7 +17,6 @@
package org.springframework.messaging.tcp.reactor;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -33,51 +32,53 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
import org.springframework.util.concurrent.SuccessCallback;
/**
* Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting
* the result Object type {@code <S>} to the expected target type {@code <T>}.
* Adapts {@link Mono} to {@link ListenableFuture} optionally converting the
* result Object type {@code <S>} to the expected target type {@code <T>}.
*
* @author Rossen Stoyanchev
* @since 4.0
* @since 5.0
* @param <S> the type of object expected from the {@link Mono}
* @param <T> the type of object expected from the {@link ListenableFuture}
*/
abstract class AbstractMonoToListenableFutureAdapter<S, T>
implements ListenableFuture<T> {
abstract class AbstractMonoToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
private final MonoProcessor<S> promise;
private final MonoProcessor<S> monoProcessor;
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
protected AbstractMonoToListenableFutureAdapter(Mono<S> promise) {
Assert.notNull(promise, "Mono must not be null");
this.promise = promise.doOnSuccess(result -> {
T adapted;
try {
adapted = adapt(result);
}
catch (Throwable ex) {
registry.failure(ex);
return;
}
registry.success(adapted);
})
.doOnError(registry::failure)
.subscribe();
protected AbstractMonoToListenableFutureAdapter(Mono<S> mono) {
Assert.notNull(mono, "'mono' must not be null");
this.monoProcessor = mono
.doOnSuccess(result -> {
T adapted;
try {
adapted = adapt(result);
}
catch (Throwable ex) {
registry.failure(ex);
return;
}
registry.success(adapted);
})
.doOnError(this.registry::failure)
.subscribe();
}
@Override
public T get() throws InterruptedException {
S result = this.promise.block();
S result = this.monoProcessor.block();
return adapt(result);
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Objects.requireNonNull(unit, "unit");
S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert(
timeout,
unit)));
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Assert.notNull(unit);
Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit));
S result = this.monoProcessor.block(duration);
return adapt(result);
}
......@@ -86,18 +87,18 @@ abstract class AbstractMonoToListenableFutureAdapter<S, T>
if (isCancelled()) {
return false;
}
this.promise.cancel();
this.monoProcessor.cancel();
return true;
}
@Override
public boolean isCancelled() {
return this.promise.isCancelled();
return this.monoProcessor.isCancelled();
}
@Override
public boolean isDone() {
return this.promise.isTerminated();
return this.monoProcessor.isTerminated();
}
@Override
......@@ -111,7 +112,6 @@ abstract class AbstractMonoToListenableFutureAdapter<S, T>
this.registry.addFailureCallback(failureCallback);
}
protected abstract T adapt(S result);
}
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,15 +19,14 @@ package org.springframework.messaging.tcp.reactor;
import reactor.core.publisher.Mono;
/**
* A Mono-to-ListenableFutureAdapter where the source and the target from
* A Mono-to-ListenableFuture adapter where the source and the target from
* the Promise and the ListenableFuture respectively are of the same type.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 4.0
* @since 5.0
*/
class MonoToListenableFutureAdapter<T> extends
AbstractMonoToListenableFutureAdapter<T, T> {
class MonoToListenableFutureAdapter<T> extends AbstractMonoToListenableFutureAdapter<T, T> {
public MonoToListenableFutureAdapter(Mono<T> mono) {
......
......@@ -17,7 +17,6 @@
package org.springframework.messaging.tcp.reactor;
import java.util.Collection;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
......@@ -58,16 +57,19 @@ import org.springframework.util.concurrent.ListenableFuture;
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 4.2
* @since 5.0
*/
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final TcpClient tcpClient;
private final TcpClient tcpClient;
private final MessageHandlerConfiguration<P> configuration;
private final ChannelGroup group;
private final ChannelGroup group;
private volatile boolean stopping;
/**
* A constructor that creates a {@link TcpClient TcpClient} factory relying on
* Reactor Netty TCP threads. The number of Netty threads can be tweaked with
......@@ -80,120 +82,116 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* @param port the port to connect to
* @param configuration the client configuration
*/
public ReactorNettyTcpClient(String host,
int port,
MessageHandlerConfiguration<P> configuration) {
this.configuration = Objects.requireNonNull(configuration, "configuration");
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.tcpClient = TcpClient.create(options -> options.connect(host, port)
.channelGroup(group));
public ReactorNettyTcpClient(String host, int port, MessageHandlerConfiguration<P> configuration) {
this(opts -> opts.connect(host, port), configuration);
}
/**
* A constructor with a configurator {@link Consumer} that will receive default {@link
* ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific
* network parameters to the generated client configuration.
* A constructor with a configurator {@link Consumer} that will receive
* default {@link ClientOptions} from {@link TcpClient}. This might be used
* to add SSL or specific network parameters to the generated client
* configuration.
*
* @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by
* connected handlers.
* @param tcpOptions callback for configuring shared {@link ClientOptions}
* @param configuration the client configuration
*/
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions,
MessageHandlerConfiguration<P> configuration) {
this.configuration = Objects.requireNonNull(configuration, "configuration");
Assert.notNull(configuration, "'configuration' is required");
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.tcpClient =
TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
this.configuration = configuration;
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
if (stopping) {
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "'handler' is required");
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
}
MessageHandler<P> handler =
new MessageHandler<>(connectionHandler, configuration);
Mono<Void> promise = tcpClient.newHandler(handler)
.doOnError(connectionHandler::afterConnectFailure)
.then();
Mono<Void> connectMono = this.tcpClient
.newHandler(new MessageHandler<>(handler, this.configuration))
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(promise);
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler,
ReconnectStrategy strategy) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "'handler' is required");
Assert.notNull(strategy, "'reconnectStrategy' is required");
if (stopping) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
}
MessageHandler<P> handler =
new MessageHandler<>(connectionHandler, configuration);
MonoProcessor<Void> promise = MonoProcessor.create();
tcpClient.newHandler(handler)
.doOnNext(e -> {
if (!promise.isTerminated()) {
promise.onComplete();
}
})
.doOnError(e -> {
if (!promise.isTerminated()) {
promise.onError(e);
}
})
.then(NettyContext::onClose)
.retryWhen(new Reconnector<>(strategy))
.repeatWhen(new Reconnector<>(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(promise);
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient.newHandler(new MessageHandler<>(handler, this.configuration))
.doOnNext(item -> {
if (!connectMono.isTerminated()) {
connectMono.onComplete();
}
})
.doOnError(ex -> {
if (!connectMono.isTerminated()) {
connectMono.onError(ex);
}
})
.then(NettyContext::onClose)
.retryWhen(new Reconnector<>(strategy))
.repeatWhen(new Reconnector<>(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> shutdown() {
if (stopping) {
if (this.stopping) {
return new MonoToListenableFutureAdapter<>(Mono.empty());
}
stopping = true;
this.stopping = true;
Mono<Void> closing = ChannelFutureMono.from(group.close());
Mono<Void> completion = ChannelFutureMono.from(this.group.close());
if (configuration.scheduler != null) {
closing =
closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown());
if (this.configuration.scheduler != null) {
completion = completion.doAfterTerminate((x, e) -> configuration.scheduler.shutdown());
}
return new MonoToListenableFutureAdapter<>(closing);
return new MonoToListenableFutureAdapter<>(completion);
}
/**
* A configuration holder
*/
public static final class MessageHandlerConfiguration<P> {
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
private final int backlog;
private final Scheduler
scheduler;
public MessageHandlerConfiguration(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
private final int backlog;
private final Scheduler scheduler;
public MessageHandlerConfiguration(
Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
int backlog,
Scheduler scheduler) {
int backlog, Scheduler scheduler) {
this.decoder = decoder;
this.encoder = encoder;
this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE;
......@@ -201,34 +199,30 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
}
private static final class MessageHandler<P> implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private static final class MessageHandler<P>
implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private final TcpConnectionHandler<P> connectionHandler;
private final MessageHandlerConfiguration<P> configuration;
MessageHandler(TcpConnectionHandler<P> connectionHandler,
MessageHandlerConfiguration<P> configuration) {
this.connectionHandler = connectionHandler;
this.configuration = configuration;
MessageHandler(TcpConnectionHandler<P> handler, MessageHandlerConfiguration<P> config) {
this.connectionHandler = handler;
this.configuration = config;
}
@Override
public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
Flux<Collection<Message<P>>> inbound = in.receive()
.map(configuration.decoder);
Flux<Collection<Message<P>>> inbound = in.receive().map(configuration.decoder);
DirectProcessor<Void> promise = DirectProcessor.create();
TcpConnection<P> tcpConnection = new ReactorNettyTcpConnection<>(in,
out,
configuration.encoder,
promise);
DirectProcessor<Void> closeProcessor = DirectProcessor.create();
TcpConnection<P> tcpConnection =
new ReactorNettyTcpConnection<>(in, out, configuration.encoder, closeProcessor);
if (configuration.scheduler != null) {
configuration.scheduler.schedule(() -> connectionHandler.afterConnected(
tcpConnection));
inbound =
inbound.publishOn(configuration.scheduler, configuration.backlog);
configuration.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection));
inbound = inbound.publishOn(configuration.scheduler, configuration.backlog);
}
else {
connectionHandler.afterConnected(tcpConnection);
......@@ -239,15 +233,16 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
connectionHandler::handleFailure,
connectionHandler::afterConnectionClosed);
return promise;
return closeProcessor;
}
}
static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
private static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
private final ReconnectStrategy strategy;
Reconnector(ReconnectStrategy strategy) {
this.strategy = strategy;
}
......@@ -255,9 +250,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public Publisher<?> apply(Flux<T> flux) {
return flux.scan(1, (p, e) -> p++)
.doOnCancel(() -> new Exception().printStackTrace())
.flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(
attempt)));
.flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt)));
}
}
......
......@@ -35,47 +35,52 @@ import org.springframework.util.concurrent.ListenableFuture;
* @param <P> the payload type of messages read or written to the TCP stream.
*
* @author Rossen Stoyanchev
* @since 4.2
* @since 5.0
*/
public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
private final NettyInbound in;
private final NettyOutbound out;
private final DirectProcessor<Void> close;
private final NettyInbound inbound;
private final NettyOutbound outbound;
private final DirectProcessor<Void> closeProcessor;
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
public ReactorNettyTcpConnection(NettyInbound in,
NettyOutbound out,
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
DirectProcessor<Void> close) {
this.out = out;
this.in = in;
DirectProcessor<Void> closeProcessor) {
this.inbound = inbound;
this.outbound = outbound;
this.encoder = encoder;
this.close = close;
this.closeProcessor = closeProcessor;
}
@Override
public ListenableFuture<Void> send(Message<P> message) {
ByteBuf byteBuf = in.channel().alloc().buffer();
encoder.accept(byteBuf, message);
return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf)));
ByteBuf byteBuf = this.inbound.channel().alloc().buffer();
this.encoder.accept(byteBuf, message);
return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf)));
}
@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
in.onReadIdle(inactivityDuration, runnable);
this.inbound.onReadIdle(inactivityDuration, runnable);
}
@Override
@SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
out.onWriteIdle(inactivityDuration, runnable);
this.outbound.onWriteIdle(inactivityDuration, runnable);
}
@Override
public void close() {
close.onComplete();
this.closeProcessor.onComplete();
}
}
......@@ -24,10 +24,10 @@ import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.InvalidMimeTypeException;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Test fixture for {@link StompDecoder}.
......@@ -39,6 +39,7 @@ public class StompDecoderTests {
private final StompDecoder decoder = new StompDecoder();
@Test
public void decodeFrameWithCrLfEols() {
Message<byte[]> frame = decode("DISCONNECT\r\n\r\n\0");
......
......@@ -34,10 +34,10 @@ public class StompEncoderTests {
private final StompEncoder encoder = new StompEncoder();
@Test
public void encodeFrameWithNoHeadersAndNoBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame)));
......@@ -48,20 +48,18 @@ public class StompEncoderTests {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
String frameString = new String(encoder.encode(frame));
assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(
frameString));
assertTrue(
"CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(frameString));
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscaped() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
......@@ -72,8 +70,8 @@ public class StompEncoderTests {
public void encodeFrameWithHeadersBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
new String(encoder.encode(frame)));
......@@ -83,8 +81,8 @@ public class StompEncoderTests {
public void encodeFrameWithContentLengthPresent() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
new String(encoder.encode(frame)));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册