提交 2e13bf8b 编写于 作者: R Rossen Stoyanchev

Refine Reactor-based TCP client implementation

Configure explicitly use of SynchronousDispatcher instead of the one
used otherwise by default (RingBufferDispatcher). As a result TCP
optations are now scoped to Netty's threads.

Remove Environment field. It is no longer required to shut it down
since we're now using SynchronousDispatcher by default.

Replace connection.in() with connection.consume() when composing
connection handling. The former creates a Stream for further composing,
e.g. via map(), filter() but all we need is to read a message.

Provide additional constructor that aceepts a pre-configured Reactor
TcpClient instance.

Issue: SPR-11531
上级 6bcbb94a
...@@ -32,6 +32,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; ...@@ -32,6 +32,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
...@@ -67,10 +68,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -67,10 +68,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final byte[] EMPTY_PAYLOAD = new byte[0]; private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final Message<byte[]> HEARTBEAT_MESSAGE; // STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3; private static final long HEARTBEAT_MULTIPLIER = 3;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
static { static {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build(); HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build();
...@@ -299,8 +301,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -299,8 +301,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
/** /**
* Configure a TCP client for managing TCP connections to the STOMP broker. By default * Configure a TCP client for managing TCP connections to the STOMP broker.
* {@link org.springframework.messaging.simp.stomp.StompReactorNettyTcpClient} is used. * By default {@link org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient} is used.
*/ */
public void setTcpClient(TcpOperations<byte[]> tcpClient) { public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient; this.tcpClient = tcpClient;
...@@ -323,7 +325,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -323,7 +325,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.brokerChannel.subscribe(this); this.brokerChannel.subscribe(this);
if (this.tcpClient == null) { if (this.tcpClient == null) {
this.tcpClient = new StompReactorNettyTcpClient(this.relayHost, this.relayPort); this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort);
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
...@@ -351,15 +353,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -351,15 +353,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.clientInboundChannel.unsubscribe(this); this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this); this.brokerChannel.unsubscribe(this);
for (StompConnectionHandler handler : this.connectionHandlers.values()) {
try {
handler.clearConnection();
}
catch (Throwable t) {
logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage());
}
}
try { try {
this.tcpClient.shutdown(); this.tcpClient.shutdown();
} }
...@@ -523,7 +516,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -523,7 +516,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(this.sessionId); headers.setSessionId(this.sessionId);
headers.setMessage(errorText); headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); Message<?> errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
sendMessageToClient(errorMessage); sendMessageToClient(errorMessage);
} }
} }
...@@ -754,4 +747,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -754,4 +747,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
} }
private static class StompTcpClientFactory {
public TcpOperations<byte[]> create(String relayHost, int relayPort) {
return new ReactorNettyTcpClient<byte[]>(relayHost, relayPort, new StompCodec());
}
}
} }
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -45,30 +45,59 @@ import reactor.tcp.spec.TcpClientSpec; ...@@ -45,30 +45,59 @@ import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple; import reactor.tuple.Tuple;
import reactor.tuple.Tuple2; import reactor.tuple.Tuple2;
/** /**
* A Reactor/Netty implementation of {@link org.springframework.messaging.tcp.TcpOperations}. * An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the Reactor project.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public class ReactorNettyTcpClient<P> implements TcpOperations<P> { public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private Environment environment;
private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
private TcpClient<Message<P>, Message<P>> tcpClient; private TcpClient<Message<P>, Message<P>> tcpClient;
/**
* A constructor that creates a {@link reactor.tcp.netty.NettyTcpClient} with
* a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which
* network I/O is handled in Netty threads.
*
* <p>Also see the constructor accepting a pre-configured Reactor
* {@link reactor.tcp.TcpClient}.
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public ReactorNettyTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) { public ReactorNettyTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
this.environment = new Environment(); this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(REACTOR_TCP_CLIENT_TYPE)
this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(NettyTcpClient.class) .env(new Environment())
.env(this.environment)
.codec(codec) .codec(codec)
.connect(host, port) .connect(host, port)
.synchronousDispatcher()
.get(); .get();
} }
/**
* A constructor with a pre-configured {@link reactor.tcp.TcpClient}.
*
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for shutting down the {@link reactor.core.Environment}
* instance with which the client is configured.
*
* @param tcpClient the TcpClient to use
*/
public ReactorNettyTcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
Assert.notNull(tcpClient, "'tcpClient' must not be null");
this.tcpClient = tcpClient;
}
@Override @Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) { public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
...@@ -121,7 +150,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { ...@@ -121,7 +150,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
connectionHandler.afterConnectionClosed(); connectionHandler.afterConnectionClosed();
} }
}); });
connection.in().consume(new Consumer<Message<P>>() { connection.consume(new Consumer<Message<P>>() {
@Override @Override
public void accept(Message<P> message) { public void accept(Message<P> message) {
connectionHandler.handleMessage(message); connectionHandler.handleMessage(message);
...@@ -130,7 +159,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { ...@@ -130,7 +159,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
connection.when(Throwable.class, new Consumer<Throwable>() { connection.when(Throwable.class, new Consumer<Throwable>() {
@Override @Override
public void accept(Throwable t) { public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t); logger.error("Exception on connection " + connectionHandler, t);
} }
}); });
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection)); connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
...@@ -161,18 +190,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { ...@@ -161,18 +190,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override @Override
public ListenableFuture<Void> shutdown() { public ListenableFuture<Void> shutdown() {
try { Promise<Void> promise = this.tcpClient.close();
Promise<Void> promise = this.tcpClient.close(); return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) {
return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) { @Override
@Override protected Void adapt(Void result) {
protected Void adapt(Void result) { return result;
return result; }
} };
};
}
finally {
this.environment.shutdown();
}
} }
} }
\ No newline at end of file
...@@ -68,10 +68,10 @@ public class BrokerMessageHandlerTests { ...@@ -68,10 +68,10 @@ public class BrokerMessageHandlerTests {
} }
@Test @Test
public void stopShouldPublishBrokerAvailabilityEvent() { public void startAndStopShouldNotPublishBrokerAvailabilityEvents() {
this.handler.start(); this.handler.start();
this.handler.stop(); this.handler.stop();
assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); assertEquals(Collections.emptyList(), this.handler.availabilityEvents);
} }
@Test @Test
...@@ -136,11 +136,6 @@ public class BrokerMessageHandlerTests { ...@@ -136,11 +136,6 @@ public class BrokerMessageHandlerTests {
setApplicationEventPublisher(this); setApplicationEventPublisher(this);
} }
@Override
protected void startInternal() {
publishBrokerAvailableEvent();
}
@Override @Override
protected void handleMessageInternal(Message<?> message) { protected void handleMessageInternal(Message<?> message) {
this.messages.add(message); this.messages.add(message);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册