diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index 64d072f9eda128887bfe2df21d5cf38625dca012..ae8f1c3f0b7cee07b2dabe59dea1fa82ff43ef1e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -29,11 +29,10 @@ import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.concurrent.ListenableFuture; /** - * A STOMP over TCP client that uses - * {@link ReactorNettyTcpClient}. + * A STOMP over TCP client that uses {@link ReactorNettyTcpClient}. * * @author Rossen Stoyanchev - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpStompClient extends StompClientSupport { @@ -99,25 +98,21 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { * Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for * encoding, decoding and hand-off. * - * @param relayHost target host - * @param relayPort target port + * @param host target host + * @param port target port * @param decoder {@link StompDecoder} to use * @return a new {@link TcpOperations} */ - protected static TcpOperations create(String relayHost, - int relayPort, - StompDecoder decoder) { - return new ReactorNettyTcpClient<>(relayHost, - relayPort, - new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction( - decoder), + protected static TcpOperations create(String host, int port, StompDecoder decoder) { + return new ReactorNettyTcpClient<>(host, port, + new ReactorNettyTcpClient.MessageHandlerConfiguration<>( + new DecodingFunction(decoder), new EncodingConsumer(new StompEncoder()), 128, Schedulers.newParallel("StompClient"))); } - private static final class EncodingConsumer - implements BiConsumer> { + private static final class EncodingConsumer implements BiConsumer> { private final StompEncoder encoder; @@ -127,12 +122,11 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { @Override public void accept(ByteBuf byteBuf, Message message) { - byteBuf.writeBytes(encoder.encode(message)); + byteBuf.writeBytes(this.encoder.encode(message)); } } - private static final class DecodingFunction - implements Function>> { + private static final class DecodingFunction implements Function>> { private final StompDecoder decoder; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java index 2b5ab32d479b74861b2591f6b727a1e009b291b3..8b6cf5bcf74c918b1cdb6fcaf71fb4a708c07ede 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java @@ -17,7 +17,6 @@ package org.springframework.messaging.tcp.reactor; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,51 +32,53 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; import org.springframework.util.concurrent.SuccessCallback; /** - * Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting - * the result Object type {@code } to the expected target type {@code }. + * Adapts {@link Mono} to {@link ListenableFuture} optionally converting the + * result Object type {@code } to the expected target type {@code }. * * @author Rossen Stoyanchev - * @since 4.0 + * @since 5.0 * @param the type of object expected from the {@link Mono} * @param the type of object expected from the {@link ListenableFuture} */ -abstract class AbstractMonoToListenableFutureAdapter - implements ListenableFuture { +abstract class AbstractMonoToListenableFutureAdapter implements ListenableFuture { - private final MonoProcessor promise; + private final MonoProcessor monoProcessor; private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); - protected AbstractMonoToListenableFutureAdapter(Mono promise) { - Assert.notNull(promise, "Mono must not be null"); - this.promise = promise.doOnSuccess(result -> { - T adapted; - try { - adapted = adapt(result); - } - catch (Throwable ex) { - registry.failure(ex); - return; - } - registry.success(adapted); - }) - .doOnError(registry::failure) - .subscribe(); + + protected AbstractMonoToListenableFutureAdapter(Mono mono) { + Assert.notNull(mono, "'mono' must not be null"); + this.monoProcessor = mono + .doOnSuccess(result -> { + T adapted; + try { + adapted = adapt(result); + } + catch (Throwable ex) { + registry.failure(ex); + return; + } + registry.success(adapted); + }) + .doOnError(this.registry::failure) + .subscribe(); } @Override public T get() throws InterruptedException { - S result = this.promise.block(); + S result = this.monoProcessor.block(); return adapt(result); } @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Objects.requireNonNull(unit, "unit"); - S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert( - timeout, - unit))); + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + Assert.notNull(unit); + Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); + S result = this.monoProcessor.block(duration); return adapt(result); } @@ -86,18 +87,18 @@ abstract class AbstractMonoToListenableFutureAdapter if (isCancelled()) { return false; } - this.promise.cancel(); + this.monoProcessor.cancel(); return true; } @Override public boolean isCancelled() { - return this.promise.isCancelled(); + return this.monoProcessor.isCancelled(); } @Override public boolean isDone() { - return this.promise.isTerminated(); + return this.monoProcessor.isTerminated(); } @Override @@ -111,7 +112,6 @@ abstract class AbstractMonoToListenableFutureAdapter this.registry.addFailureCallback(failureCallback); } - protected abstract T adapt(S result); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java index 8ecab4a974df6db36e9455fbe4b77e3392808230..ad82d685a338817e2ed5132df7632b59ebc95411 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,15 +19,14 @@ package org.springframework.messaging.tcp.reactor; import reactor.core.publisher.Mono; /** - * A Mono-to-ListenableFutureAdapter where the source and the target from + * A Mono-to-ListenableFuture adapter where the source and the target from * the Promise and the ListenableFuture respectively are of the same type. * * @author Rossen Stoyanchev * @author Stephane Maldini - * @since 4.0 + * @since 5.0 */ -class MonoToListenableFutureAdapter extends - AbstractMonoToListenableFutureAdapter { +class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { public MonoToListenableFutureAdapter(Mono mono) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 0b93368e8ce530014046d7d962bd51c87a04efbc..75df13921b8c14a80c5c43f88a41dc47bb1e0f9f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -17,7 +17,6 @@ package org.springframework.messaging.tcp.reactor; import java.util.Collection; -import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -58,16 +57,19 @@ import org.springframework.util.concurrent.ListenableFuture; * * @author Rossen Stoyanchev * @author Stephane Maldini - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpClient

implements TcpOperations

{ - private final TcpClient tcpClient; + private final TcpClient tcpClient; private final MessageHandlerConfiguration

configuration; - private final ChannelGroup group; + + private final ChannelGroup group; + private volatile boolean stopping; + /** * A constructor that creates a {@link TcpClient TcpClient} factory relying on * Reactor Netty TCP threads. The number of Netty threads can be tweaked with @@ -80,120 +82,116 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @param port the port to connect to * @param configuration the client configuration */ - public ReactorNettyTcpClient(String host, - int port, - MessageHandlerConfiguration

configuration) { - this.configuration = Objects.requireNonNull(configuration, "configuration"); - this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = TcpClient.create(options -> options.connect(host, port) - .channelGroup(group)); + public ReactorNettyTcpClient(String host, int port, MessageHandlerConfiguration

configuration) { + this(opts -> opts.connect(host, port), configuration); } /** - * A constructor with a configurator {@link Consumer} that will receive default {@link - * ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific - * network parameters to the generated client configuration. + * A constructor with a configurator {@link Consumer} that will receive + * default {@link ClientOptions} from {@link TcpClient}. This might be used + * to add SSL or specific network parameters to the generated client + * configuration. * - * @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by - * connected handlers. + * @param tcpOptions callback for configuring shared {@link ClientOptions} * @param configuration the client configuration */ public ReactorNettyTcpClient(Consumer tcpOptions, MessageHandlerConfiguration

configuration) { - this.configuration = Objects.requireNonNull(configuration, "configuration"); + + Assert.notNull(configuration, "'configuration' is required"); this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = - TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + this.configuration = configuration; } + @Override - public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - if (stopping) { + public ListenableFuture connect(final TcpConnectionHandler

handler) { + Assert.notNull(handler, "'handler' is required"); + + if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); + handler.afterConnectFailure(ex); return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - MessageHandler

handler = - new MessageHandler<>(connectionHandler, configuration); - - Mono promise = tcpClient.newHandler(handler) - .doOnError(connectionHandler::afterConnectFailure) - .then(); + Mono connectMono = this.tcpClient + .newHandler(new MessageHandler<>(handler, this.configuration)) + .doOnError(handler::afterConnectFailure) + .then(); - return new MonoToListenableFutureAdapter<>(promise); + return new MonoToListenableFutureAdapter<>(connectMono); } @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler, - ReconnectStrategy strategy) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - Assert.notNull(strategy, "ReconnectStrategy must not be null"); + public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { + Assert.notNull(handler, "'handler' is required"); + Assert.notNull(strategy, "'reconnectStrategy' is required"); - if (stopping) { + if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); + handler.afterConnectFailure(ex); return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - MessageHandler

handler = - new MessageHandler<>(connectionHandler, configuration); - - MonoProcessor promise = MonoProcessor.create(); - - tcpClient.newHandler(handler) - .doOnNext(e -> { - if (!promise.isTerminated()) { - promise.onComplete(); - } - }) - .doOnError(e -> { - if (!promise.isTerminated()) { - promise.onError(e); - } - }) - .then(NettyContext::onClose) - .retryWhen(new Reconnector<>(strategy)) - .repeatWhen(new Reconnector<>(strategy)) - .subscribe(); - - return new MonoToListenableFutureAdapter<>(promise); + MonoProcessor connectMono = MonoProcessor.create(); + + this.tcpClient.newHandler(new MessageHandler<>(handler, this.configuration)) + .doOnNext(item -> { + if (!connectMono.isTerminated()) { + connectMono.onComplete(); + } + }) + .doOnError(ex -> { + if (!connectMono.isTerminated()) { + connectMono.onError(ex); + } + }) + .then(NettyContext::onClose) + .retryWhen(new Reconnector<>(strategy)) + .repeatWhen(new Reconnector<>(strategy)) + .subscribe(); + + return new MonoToListenableFutureAdapter<>(connectMono); } @Override public ListenableFuture shutdown() { - if (stopping) { + if (this.stopping) { return new MonoToListenableFutureAdapter<>(Mono.empty()); } - stopping = true; + this.stopping = true; - Mono closing = ChannelFutureMono.from(group.close()); + Mono completion = ChannelFutureMono.from(this.group.close()); - if (configuration.scheduler != null) { - closing = - closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); + if (this.configuration.scheduler != null) { + completion = completion.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); } - return new MonoToListenableFutureAdapter<>(closing); + return new MonoToListenableFutureAdapter<>(completion); } + /** * A configuration holder */ public static final class MessageHandlerConfiguration

{ private final Function>> decoder; - private final BiConsumer> encoder; - private final int backlog; - private final Scheduler - scheduler; - public MessageHandlerConfiguration(Function>> decoder, + private final BiConsumer> encoder; + + private final int backlog; + + private final Scheduler scheduler; + + + public MessageHandlerConfiguration( + Function>> decoder, BiConsumer> encoder, - int backlog, - Scheduler scheduler) { + int backlog, Scheduler scheduler) { + this.decoder = decoder; this.encoder = encoder; this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE; @@ -201,34 +199,30 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } } - private static final class MessageHandler

implements BiFunction> { + private static final class MessageHandler

+ implements BiFunction> { private final TcpConnectionHandler

connectionHandler; private final MessageHandlerConfiguration

configuration; - MessageHandler(TcpConnectionHandler

connectionHandler, - MessageHandlerConfiguration

configuration) { - this.connectionHandler = connectionHandler; - this.configuration = configuration; + + MessageHandler(TcpConnectionHandler

handler, MessageHandlerConfiguration

config) { + this.connectionHandler = handler; + this.configuration = config; } @Override public Publisher apply(NettyInbound in, NettyOutbound out) { - Flux>> inbound = in.receive() - .map(configuration.decoder); + Flux>> inbound = in.receive().map(configuration.decoder); - DirectProcessor promise = DirectProcessor.create(); - TcpConnection

tcpConnection = new ReactorNettyTcpConnection<>(in, - out, - configuration.encoder, - promise); + DirectProcessor closeProcessor = DirectProcessor.create(); + TcpConnection

tcpConnection = + new ReactorNettyTcpConnection<>(in, out, configuration.encoder, closeProcessor); if (configuration.scheduler != null) { - configuration.scheduler.schedule(() -> connectionHandler.afterConnected( - tcpConnection)); - inbound = - inbound.publishOn(configuration.scheduler, configuration.backlog); + configuration.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection)); + inbound = inbound.publishOn(configuration.scheduler, configuration.backlog); } else { connectionHandler.afterConnected(tcpConnection); @@ -239,15 +233,16 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connectionHandler::handleFailure, connectionHandler::afterConnectionClosed); - return promise; + return closeProcessor; } } - static final class Reconnector implements Function, Publisher> { + private static final class Reconnector implements Function, Publisher> { private final ReconnectStrategy strategy; + Reconnector(ReconnectStrategy strategy) { this.strategy = strategy; } @@ -255,9 +250,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public Publisher apply(Flux flux) { return flux.scan(1, (p, e) -> p++) - .doOnCancel(() -> new Exception().printStackTrace()) - .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt( - attempt))); + .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt))); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 7fe758e842c01df7a85ef371c392cea21bbb1d05..6f4ec74ecbaa9bede17fb969c48d3fdcff961465 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -35,47 +35,52 @@ import org.springframework.util.concurrent.ListenableFuture; * @param

the payload type of messages read or written to the TCP stream. * * @author Rossen Stoyanchev - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpConnection

implements TcpConnection

{ - private final NettyInbound in; - private final NettyOutbound out; - private final DirectProcessor close; + private final NettyInbound inbound; + + private final NettyOutbound outbound; + + private final DirectProcessor closeProcessor; + private final BiConsumer> encoder; - public ReactorNettyTcpConnection(NettyInbound in, - NettyOutbound out, + + public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, BiConsumer> encoder, - DirectProcessor close) { - this.out = out; - this.in = in; + DirectProcessor closeProcessor) { + + this.inbound = inbound; + this.outbound = outbound; this.encoder = encoder; - this.close = close; + this.closeProcessor = closeProcessor; } + @Override public ListenableFuture send(Message

message) { - ByteBuf byteBuf = in.channel().alloc().buffer(); - encoder.accept(byteBuf, message); - return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf))); + ByteBuf byteBuf = this.inbound.channel().alloc().buffer(); + this.encoder.accept(byteBuf, message); + return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf))); } @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - in.onReadIdle(inactivityDuration, runnable); + this.inbound.onReadIdle(inactivityDuration, runnable); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - out.onWriteIdle(inactivityDuration, runnable); + this.outbound.onWriteIdle(inactivityDuration, runnable); } @Override public void close() { - close.onComplete(); + this.closeProcessor.onComplete(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java index 98816ef77fc0d9c2e004f3032f22f4c7962a2e26..f48c6f6d031664011f7ba0b62b25533995711b29 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java @@ -24,10 +24,10 @@ import org.junit.Test; import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.InvalidMimeTypeException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * Test fixture for {@link StompDecoder}. @@ -39,6 +39,7 @@ public class StompDecoderTests { private final StompDecoder decoder = new StompDecoder(); + @Test public void decodeFrameWithCrLfEols() { Message frame = decode("DISCONNECT\r\n\r\n\0"); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java index ef6565cdd3e4316e0f10b55f51333bd9b19fe856..9a218c911df02141638ad3939cface42507697d6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java @@ -34,10 +34,10 @@ public class StompEncoderTests { private final StompEncoder encoder = new StompEncoder(); + @Test public void encodeFrameWithNoHeadersAndNoBody() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame))); @@ -48,20 +48,18 @@ public class StompEncoderTests { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.2"); headers.setHost("github.org"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - String frameString = new String(encoder.encode(frame)); - assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals( - frameString)); + assertTrue( + "CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || + "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(frameString)); } @Test public void encodeFrameWithHeadersThatShouldBeEscaped() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", @@ -72,8 +70,8 @@ public class StompEncoderTests { public void encodeFrameWithHeadersBody() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "alpha"); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + Message frame = MessageBuilder.createMessage( + "Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", new String(encoder.encode(frame))); @@ -83,8 +81,8 @@ public class StompEncoderTests { public void encodeFrameWithContentLengthPresent() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setContentLength(12); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + Message frame = MessageBuilder.createMessage( + "Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\ncontent-length:12\n\nMessage body\0", new String(encoder.encode(frame)));