提交 f69c1530 编写于 作者: oldratlee's avatar oldratlee 🔥

Merge branch 'master' of https://github.com/AlibabaTech/dubbo

......@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.Map;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.common.utils.ReflectUtils;
......@@ -41,6 +43,8 @@ import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeIn
*/
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);
private Channel channel;
private byte serializationType;
......@@ -66,6 +70,9 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
......@@ -101,7 +108,9 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
try {
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
e.printStackTrace();
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
......
......@@ -21,6 +21,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Type;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.common.utils.StringUtils;
......@@ -38,6 +40,8 @@ import com.alibaba.dubbo.rpc.support.RpcUtils;
*/
public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);
private Channel channel;
private byte serializationType;
......@@ -104,6 +108,9 @@ public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc result failed: " + e.getMessage(), e);
}
response.setStatus(Response.CLIENT_ERROR);
response.setErrorMessage(StringUtils.toString(e));
} finally {
......
......@@ -17,14 +17,14 @@ package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.serialize.ObjectOutput;
import com.alibaba.dubbo.common.serialize.Serialization;
......@@ -50,6 +50,8 @@ import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeIn
*/
public class DubboCodec extends ExchangeCodec implements Codec {
private static final Logger log = LoggerFactory.getLogger(DubboCodec.class);
public static final String NAME = "dubbo";
public static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion());
......@@ -64,27 +66,6 @@ public class DubboCodec extends ExchangeCodec implements Codec {
public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];
@Override
public Object decode(Channel channel, InputStream is) throws IOException {
List<Object> result = new ArrayList<Object>();
do{
Object obj = super.decode(channel, is);
if (NEED_MORE_INPUT == obj) {
break;
} else {
result.add(obj);
}
} while (true);
if (result.isEmpty()) {
return NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
......@@ -118,6 +99,9 @@ public class DubboCodec extends ExchangeCodec implements Codec {
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
......@@ -151,6 +135,9 @@ public class DubboCodec extends ExchangeCodec implements Codec {
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
......
......@@ -19,11 +19,11 @@ package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.io.CountInputStream;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.exchange.Request;
......@@ -36,8 +36,6 @@ import com.alibaba.dubbo.rpc.RpcResult;
*/
public final class DubboCountCodec implements Codec {
private static final Logger log = LoggerFactory.getLogger(DubboCountCodec.class);
private DubboCodec codec = new DubboCodec();
public void encode(Channel channel, OutputStream output, Object msg) throws IOException {
......@@ -46,25 +44,44 @@ public final class DubboCountCodec implements Codec {
public Object decode(Channel channel, InputStream input) throws IOException {
CountInputStream statInputStream = new CountInputStream(input);
Object result = codec.decode(channel, statInputStream);
if (result != NEED_MORE_INPUT) {
if (result instanceof Request) {
try {
((RpcInvocation) ((Request) result).getData()).setAttachment(
Constants.INPUT_KEY, String.valueOf(statInputStream.getReadBytes()));
} catch (Throwable e) {
/* ignore */
}
} else if (result instanceof Response) {
try {
((RpcResult) ((Response) result).getResult()).setAttachment(
Constants.OUTPUT_KEY, String.valueOf(statInputStream.getReadBytes()));
} catch (Throwable e) {
/* ignreo */
}
long save = 0;
List<Object> result = new ArrayList<Object>();
do {
Object obj = codec.decode(channel, statInputStream);
if (NEED_MORE_INPUT == obj) {
break;
} else {
result.add(obj);
logMessageLength(obj, statInputStream.getReadBytes() - save);
save = statInputStream.getReadBytes();
}
} while (true);
if (result.isEmpty()) {
return NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
private void logMessageLength(Object result, long bytes) {
if (bytes <= 0) { return; }
if (result instanceof Request) {
try {
((RpcInvocation) ((Request) result).getData()).setAttachment(
Constants.INPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignore */
}
} else if (result instanceof Response) {
try {
((RpcResult) ((Response) result).getResult()).setAttachment(
Constants.OUTPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignreo */
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册