diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java index 62e80d51f26f8a4dab85dadef00bd83a93840092..32646f3dc302ed7eab4c380e7af80ce631a4dd2a 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -124,69 +124,8 @@ public class ExchangeCodec extends TelnetCodec { if( readable != tt ) is = StreamUtils.limitedInputStream(is, len); - return decodeBody(channel, is, header); - } - - protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { try { - byte flag = header[2], proto = (byte)( flag & SERIALIZATION_MASK ); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - ObjectInput in = s.deserialize(channel.getUrl(), is); - // get request id. - long id = Bytes.bytes2long(header, 4); - if( ( flag & FLAG_REQUEST ) == 0 ) { - // decode response. - Response res = new Response(id); - if (( flag & FLAG_EVENT ) != 0){ - res.setEvent(Response.HEARTBEAT_EVENT); - } - // get status. - byte status = header[3]; - res.setStatus(status); - if( status == Response.OK ) { - try { - Object data; - if (res.isHeartbeat()) { - data = decodeHeartbeatData(channel, in); - } else if (res.isEvent()) { - data = decodeEventData(channel, in); - } else { - data = decodeResponseData(channel, in, getRequestData(id)); - } - res.setResult(data); - } catch (Throwable t) { - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); - } - } else { - res.setErrorMessage(in.readUTF()); - } - return res; - } else { - // decode request. - Request req = new Request(id); - req.setVersion("2.0.0"); - req.setTwoWay( ( flag & FLAG_TWOWAY ) != 0 ); - if (( flag & FLAG_EVENT ) != 0 ){ - req.setEvent(Request.HEARTBEAT_EVENT); - } - try { - Object data; - if (req.isHeartbeat()) { - data = decodeHeartbeatData(channel, in); - } else if (req.isEvent()) { - data = decodeEventData(channel, in); - } else { - data = decodeRequestData(channel, in); - } - req.setData(data); - } catch (Throwable t) { - // bad request - req.setBroken(true); - req.setData(t); - } - return req; - } + return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { @@ -201,6 +140,67 @@ public class ExchangeCodec extends TelnetCodec { } } + protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { + byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); + Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); + ObjectInput in = s.deserialize(channel.getUrl(), is); + // get request id. + long id = Bytes.bytes2long(header, 4); + if ((flag & FLAG_REQUEST) == 0) { + // decode response. + Response res = new Response(id); + if ((flag & FLAG_EVENT) != 0) { + res.setEvent(Response.HEARTBEAT_EVENT); + } + // get status. + byte status = header[3]; + res.setStatus(status); + if (status == Response.OK) { + try { + Object data; + if (res.isHeartbeat()) { + data = decodeHeartbeatData(channel, in); + } else if (res.isEvent()) { + data = decodeEventData(channel, in); + } else { + data = decodeResponseData(channel, in, getRequestData(id)); + } + res.setResult(data); + } catch (Throwable t) { + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); + } + } else { + res.setErrorMessage(in.readUTF()); + } + return res; + } else { + // decode request. + Request req = new Request(id); + req.setVersion("2.0.0"); + req.setTwoWay((flag & FLAG_TWOWAY) != 0); + if ((flag & FLAG_EVENT) != 0) { + req.setEvent(Request.HEARTBEAT_EVENT); + } + try { + Object data; + if (req.isHeartbeat()) { + data = decodeHeartbeatData(channel, in); + } else if (req.isEvent()) { + data = decodeEventData(channel, in); + } else { + data = decodeRequestData(channel, in); + } + req.setData(data); + } catch (Throwable t) { + // bad request + req.setBroken(true); + req.setData(t); + } + return req; + } + } + protected Object getRequestData(long id) { DefaultFuture future = DefaultFuture.getFuture(id); if (future == null) diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java index 3641377c85a58f6790fb97e4e79a068b61da4b22..6a003d3c1c62b60bc947fc87d8c352cfc5efc1c5 100644 --- a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -88,12 +88,17 @@ public class DubboCodec extends ExchangeCodec implements Codec { } else if (res.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { - DecodeableRpcResult result = new DecodeableRpcResult(channel, res, - new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); + DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { + result = new DecodeableRpcResult(channel, res, is, + (Invocation)getRequestData(id), proto); result.decode(); + } else { + result = new DecodeableRpcResult(channel, res, + new UnsafeByteArrayInputStream(readMessageData(is)), + (Invocation) getRequestData(id), proto); } data = result; } @@ -124,12 +129,15 @@ public class DubboCodec extends ExchangeCodec implements Codec { } else if (req.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { - DecodeableRpcInvocation inv = new DecodeableRpcInvocation(channel, req, - new UnsafeByteArrayInputStream(readMessageData(is)), proto); + DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { + inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); + } else { + inv = new DecodeableRpcInvocation(channel, req, + new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; }