提交 52c36be5 编写于 作者: K kimi

DUBBO-8 Codec线程模型优化,如果流中有多个message则在一次decode中把这些message全都decode

上级 fec02024
......@@ -23,12 +23,12 @@ import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate;
import com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class HeartbeatHandler implements ChannelHandlerDelegate {
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class);
......@@ -36,20 +36,8 @@ public class HeartbeatHandler implements ChannelHandlerDelegate {
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
private ChannelHandler handler;
public HeartbeatHandler(ChannelHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate)handler).getHandler();
}
return handler;
super(handler);
}
public void connected(Channel channel) throws RemotingException {
......@@ -100,10 +88,6 @@ public class HeartbeatHandler implements ChannelHandlerDelegate {
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
......
package com.alibaba.dubbo.remoting.transport;
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
protected ChannelHandler handler;
protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
Assert.notNull(handler, "handler == null");
this.handler = handler;
}
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
((ChannelHandlerDelegate)handler).getHandler();
}
return handler;
}
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}
public void received(Channel channel, Object message) throws RemotingException {
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
}
......@@ -29,34 +29,12 @@ import com.alibaba.dubbo.remoting.exchange.Response;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class DecodeHandler implements ChannelHandlerDelegate {
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
private ChannelHandler handler;
public DecodeHandler(ChannelHandler handler) {
Assert.notNull(handler, "handler == null");
this.handler = handler;
}
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate)handler).getHandler();
}
return handler;
}
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
super(handler);
}
public void received(Channel channel, Object message) throws RemotingException {
......@@ -75,10 +53,6 @@ public class DecodeHandler implements ChannelHandlerDelegate {
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
......
package com.alibaba.dubbo.remoting.transport;
import java.util.List;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(MultiMessageHandler.class);
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof List) {
List list = (List)message;
for(Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Dispather;
import com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler;
import com.alibaba.dubbo.remoting.transport.MultiMessageHandler;
/**
* @author chao.liuc
......@@ -35,8 +36,8 @@ public class ChannelHandlers {
protected ChannelHandlers() {}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class)
.getAdaptiveExtension().dispath(handler, url));
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class)
.getAdaptiveExtension().dispath(handler, url)));
}
private static ChannelHandlers INSTANCE = new ChannelHandlers();
......
......@@ -17,6 +17,8 @@ package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
......@@ -62,6 +64,27 @@ public class DubboCodec extends ExchangeCodec implements Codec {
public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];
@Override
public Object decode(Channel channel, InputStream is) throws IOException {
List<Object> result = new ArrayList<Object>();
do{
Object obj = super.decode(channel, is);
if (NEED_MORE_INPUT == obj) {
break;
} else {
result.add(obj);
}
} while (true);
if (result.isEmpty()) {
return NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册