diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java index a9239581a8ab0bf7add5fd685505dc3770739040..4816ff8477d5908d8c5957a1cc400574264aca51 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java @@ -23,12 +23,12 @@ import com.alibaba.dubbo.remoting.ChannelHandler; import com.alibaba.dubbo.remoting.RemotingException; import com.alibaba.dubbo.remoting.exchange.Request; import com.alibaba.dubbo.remoting.exchange.Response; -import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate; +import com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate; /** * @author kimi */ -public class HeartbeatHandler implements ChannelHandlerDelegate { +public class HeartbeatHandler extends AbstractChannelHandlerDelegate { static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class); @@ -36,20 +36,8 @@ public class HeartbeatHandler implements ChannelHandlerDelegate { public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; - private ChannelHandler handler; - public HeartbeatHandler(ChannelHandler handler) { - if (handler == null) { - throw new IllegalArgumentException("handler == null"); - } - this.handler = handler; - } - - public ChannelHandler getHandler() { - if (handler instanceof ChannelHandlerDelegate) { - return ((ChannelHandlerDelegate)handler).getHandler(); - } - return handler; + super(handler); } public void connected(Channel channel) throws RemotingException { @@ -100,10 +88,6 @@ public class HeartbeatHandler implements ChannelHandlerDelegate { handler.received(channel, message); } - public void caught(Channel channel, Throwable exception) throws RemotingException { - handler.caught(channel, exception); - } - private void setReadTimestamp(Channel channel) { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractChannelHandlerDelegate.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractChannelHandlerDelegate.java new file mode 100644 index 0000000000000000000000000000000000000000..4fd7bc5b0579a8272cb97e063702c653d29997ee --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractChannelHandlerDelegate.java @@ -0,0 +1,46 @@ +package com.alibaba.dubbo.remoting.transport; + +import com.alibaba.dubbo.common.utils.Assert; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.ChannelHandler; +import com.alibaba.dubbo.remoting.RemotingException; + +/** + * @author kimi + */ +public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate { + + protected ChannelHandler handler; + + protected AbstractChannelHandlerDelegate(ChannelHandler handler) { + Assert.notNull(handler, "handler == null"); + this.handler = handler; + } + + public ChannelHandler getHandler() { + if (handler instanceof ChannelHandlerDelegate) { + ((ChannelHandlerDelegate)handler).getHandler(); + } + return handler; + } + + public void connected(Channel channel) throws RemotingException { + handler.connected(channel); + } + + public void disconnected(Channel channel) throws RemotingException { + handler.disconnected(channel); + } + + public void sent(Channel channel, Object message) throws RemotingException { + handler.sent(channel, message); + } + + public void received(Channel channel, Object message) throws RemotingException { + handler.received(channel, message); + } + + public void caught(Channel channel, Throwable exception) throws RemotingException { + handler.caught(channel, exception); + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java index 837de8a153fac73939a2425a26a48f663318344a..f075c52df65bb963875fd26a76d8257ed60fe81a 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java @@ -29,34 +29,12 @@ import com.alibaba.dubbo.remoting.exchange.Response; /** * @author kimi */ -public class DecodeHandler implements ChannelHandlerDelegate { +public class DecodeHandler extends AbstractChannelHandlerDelegate { private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class); - private ChannelHandler handler; - public DecodeHandler(ChannelHandler handler) { - Assert.notNull(handler, "handler == null"); - this.handler = handler; - } - - public ChannelHandler getHandler() { - if (handler instanceof ChannelHandlerDelegate) { - return ((ChannelHandlerDelegate)handler).getHandler(); - } - return handler; - } - - public void connected(Channel channel) throws RemotingException { - handler.connected(channel); - } - - public void disconnected(Channel channel) throws RemotingException { - handler.disconnected(channel); - } - - public void sent(Channel channel, Object message) throws RemotingException { - handler.sent(channel, message); + super(handler); } public void received(Channel channel, Object message) throws RemotingException { @@ -75,10 +53,6 @@ public class DecodeHandler implements ChannelHandlerDelegate { handler.received(channel, message); } - public void caught(Channel channel, Throwable exception) throws RemotingException { - handler.caught(channel, exception); - } - private void decode(Object message) { if (message != null && message instanceof Decodeable) { try { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/MultiMessageHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/MultiMessageHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..8fc5b50749e14fdf4a0f84a90e576416d95a463e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/MultiMessageHandler.java @@ -0,0 +1,33 @@ +package com.alibaba.dubbo.remoting.transport; + +import java.util.List; + +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.ChannelHandler; +import com.alibaba.dubbo.remoting.RemotingException; + +/** + * @author kimi + */ +public class MultiMessageHandler extends AbstractChannelHandlerDelegate { + + private static final Logger log = LoggerFactory.getLogger(MultiMessageHandler.class); + + public MultiMessageHandler(ChannelHandler handler) { + super(handler); + } + + @Override + public void received(Channel channel, Object message) throws RemotingException { + if (message instanceof List) { + List list = (List)message; + for(Object obj : list) { + handler.received(channel, obj); + } + } else { + handler.received(channel, message); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java index 9263de82c81625d60aa2a1c74ccb4db5ee8da478..3c4b76e1fe8b109faaee02faadfced48df13a3b1 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java @@ -21,6 +21,7 @@ import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.remoting.ChannelHandler; import com.alibaba.dubbo.remoting.Dispather; import com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler; +import com.alibaba.dubbo.remoting.transport.MultiMessageHandler; /** * @author chao.liuc @@ -35,8 +36,8 @@ public class ChannelHandlers { protected ChannelHandlers() {} protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { - return new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class) - .getAdaptiveExtension().dispath(handler, url)); + return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class) + .getAdaptiveExtension().dispath(handler, url))); } private static ChannelHandlers INSTANCE = new ChannelHandlers(); diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java index d10525d564af0177d90ec7851672061f22625676..bd7e707fd14d4cc9f69410b1a62e1217db7cf133 100644 --- a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -17,6 +17,8 @@ package com.alibaba.dubbo.rpc.protocol.dubbo; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; @@ -62,6 +64,27 @@ public class DubboCodec extends ExchangeCodec implements Codec { public static final Class[] EMPTY_CLASS_ARRAY = new Class[0]; + @Override + public Object decode(Channel channel, InputStream is) throws IOException { + List result = new ArrayList(); + do{ + Object obj = super.decode(channel, is); + if (NEED_MORE_INPUT == obj) { + break; + } else { + result.add(obj); + } + } while (true); + + if (result.isEmpty()) { + return NEED_MORE_INPUT; + } + if (result.size() == 1) { + return result.get(0); + } + return result; + } + protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);