提交 ea274ebc 编写于 作者: R Rossen Stoyanchev

Fix decoding issue in Reactor TcpClient

When decoding STOMP messages unread portions of a given input ByteBuf
must be kept until more input is received and the next complete STOMP
frame can be parsed.

In Reactor Net 2.x this was handled for us through the "remainder"
field in NettyChannelHandlerBridge. The Reactor Netty 0.6 upgrade
however applied only a simple map operator on the input ByteBuf
after which the buffer is relased.

This commit replaces the use of a simple map operator for decoding
and installs a ByteToMessageDecoder in the Netty channel pipeline
which has a built-in ability to preserve and merge unread input into
subsequent input buffers.
上级 fdf88c97
......@@ -15,19 +15,27 @@
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import java.nio.ByteBuffer;
import java.util.List;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCodec;
/**
* {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and
* {@link StompEncoder}.
* Simple delegation to StompDecoder and StompEncoder.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
class StompReactorNettyCodec extends ReactorNettyCodec<byte[]> {
class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec<byte[]> {
private final StompDecoder decoder;
private final StompEncoder encoder;
public StompReactorNettyCodec() {
this(new StompDecoder(), new StompEncoder());
this(new StompDecoder());
}
public StompReactorNettyCodec(StompDecoder decoder) {
......@@ -35,8 +43,18 @@ class StompReactorNettyCodec extends ReactorNettyCodec<byte[]> {
}
public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) {
super(byteBuf -> decoder.decode(byteBuf.nioBuffer()),
(byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message)));
this.decoder = decoder;
this.encoder = encoder;
}
@Override
protected List<Message<byte[]>> decodeInternal(ByteBuffer nioBuffer) {
return this.decoder.decode(nioBuffer);
}
protected ByteBuffer encodeInternal(Message<byte[]> message) {
return ByteBuffer.wrap(this.encoder.encode(message));
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.springframework.messaging.Message;
/**
* Convenient base class for {@link ReactorNettyCodec} implementations that need
* to work with NIO {@link ByteBuffer}s.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class AbstractNioBufferReactorNettyCodec<P> implements ReactorNettyCodec<P> {
@Override
public Collection<Message<P>> decode(ByteBuf inputBuffer) {
ByteBuffer nioBuffer = inputBuffer.nioBuffer();
int start = nioBuffer.position();
List<Message<P>> messages = decodeInternal(nioBuffer);
inputBuffer.skipBytes(nioBuffer.position() - start);
return messages;
}
protected abstract List<Message<P>> decodeInternal(ByteBuffer nioBuffer);
@Override
public void encode(Message<P> message, ByteBuf outputBuffer) {
outputBuffer.writeBytes(encodeInternal(message));
}
protected abstract ByteBuffer encodeInternal(Message<P> message);
}
......@@ -22,7 +22,6 @@ import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* Simple holder for a decoding {@link Function} and an encoding
......@@ -31,28 +30,20 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactorNettyCodec<P> {
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
public ReactorNettyCodec(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
BiConsumer<? super ByteBuf, ? super Message<P>> encoder) {
Assert.notNull(decoder, "'decoder' is required");
Assert.notNull(encoder, "'encoder' is required");
this.decoder = decoder;
this.encoder = encoder;
}
public Function<? super ByteBuf, ? extends Collection<Message<P>>> getDecoder() {
return this.decoder;
}
public BiConsumer<? super ByteBuf, ? super Message<P>> getEncoder() {
return this.encoder;
}
public interface ReactorNettyCodec<P> {
/**
* Decode the input {@link ByteBuf} into one or more {@link Message}s.
* @param inputBuffer the input buffer to decode from
* @return 0 or more decoded messages
*/
Collection<Message<P>> decode(ByteBuf inputBuffer);
/**
* Encode the given {@link Message} to the output {@link ByteBuf}.
* @param message the message the encode
* @param outputBuffer the buffer to write to
*/
void encode(Message<P> message, ByteBuf outputBuffer);
}
......@@ -16,13 +16,18 @@
package org.springframework.messaging.tcp.reactor;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
......@@ -39,6 +44,7 @@ import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.util.concurrent.QueueSupplier;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
......@@ -170,6 +176,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
this.connectionHandler = handler;
}
@SuppressWarnings("unchecked")
@Override
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
......@@ -177,10 +184,11 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
inbound.receive()
.map(codec.getDecoder())
inbound.context().addDecoder(new StompMessageDecoder<>(codec));
inbound.receiveObject()
.cast(Message.class)
.publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE)
.flatMapIterable(Function.identity())
.subscribe(
connectionHandler::handleMessage,
connectionHandler::handleFailure,
......@@ -190,4 +198,19 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
}
private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
private final ReactorNettyCodec<P> codec;
public StompMessageDecoder(ReactorNettyCodec<P> codec) {
this.codec = codec;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Collection<Message<P>> messages = codec.decode(in);
out.addAll(messages);
}
}
}
......@@ -58,7 +58,7 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override
public ListenableFuture<Void> send(Message<P> message) {
ByteBuf byteBuf = this.outbound.alloc().buffer();
this.codec.getEncoder().accept(byteBuf, message);
this.codec.encode(message, byteBuf);
Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then();
return new MonoToListenableFutureAdapter<>(sendCompletion);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册