提交 2b125b49 编写于 作者: K kimi

DUBBO-454 支持字节流泛化调用

上级 3c5d9e33
......@@ -574,6 +574,10 @@ public class Constants {
public static final String DEFAULT_EXECUTOR_SERVICE_KEY = "threadnotsafe";
public static final String GENERIC_SERIALIZATION_JAVA = "java";
public static final String GENERIC_SERIALIZATION_DEFAULT = "true";
/*
* private Constants(){ }
*/
......
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class SerializationUtils {
private static final Logger log = LoggerFactory.getLogger(SerializationUtils.class);
private SerializationUtils() {}
public static Object javaDeserialize(byte[] bytes) throws Exception {
ObjectInputStream objectInputStream = new ObjectInputStream(
new ByteArrayInputStream(bytes));
try {
return objectInputStream.readObject();
} finally {
close(objectInputStream);
}
}
public static byte[] javaSerialize(Object obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
try {
objectOutputStream.writeObject(obj);
return out.toByteArray();
} finally {
close(objectOutputStream);
}
}
private static void close(Closeable closeable) {
try {
closeable.close();
} catch (IOException e) {
if (log.isWarnEnabled()) {
log.warn("Close closeable failed: " + e.getMessage(), e);
}
}
}
}
......@@ -18,6 +18,7 @@ package com.alibaba.dubbo.config;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.config.support.Parameter;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
......@@ -40,7 +41,7 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
protected Boolean init;
// 是否使用泛接口
protected Boolean generic;
protected String generic;
// 优先从JVM内获取引用实例
protected Boolean injvm;
......@@ -79,13 +80,23 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
@Parameter(excluded = true)
public Boolean isGeneric() {
return generic;
return ProtocolUtils.isGeneric(generic);
}
public void setGeneric(Boolean generic) {
if (generic != null) {
this.generic = generic.toString();
}
}
public void setGeneric(String generic) {
this.generic = generic;
}
public String getGeneric() {
return generic;
}
/**
* @return
* @deprecated 通过scope进行判断,scope=local
......
......@@ -147,13 +147,10 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
// 获取消费者全局配置
checkDefault();
appendProperties(this);
if (generic == null && consumer != null) {
generic = consumer.isGeneric();
if (! isGeneric() && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (generic == null) {
generic = false;
}
if (generic) {
if (isGeneric()) {
interfaceClass = GenericService.class;
} else {
try {
......@@ -242,7 +239,7 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! generic) {
if (! isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
......@@ -406,9 +403,8 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
if (interfaceClass != null) {
return interfaceClass;
}
if ((generic != null && generic.booleanValue())
|| (consumer != null && consumer.isGeneric() != null
&& consumer.isGeneric().booleanValue())) {
if (isGeneric()
|| (getConsumer() != null && getConsumer().isGeneric())) {
return GenericService.class;
}
try {
......
......@@ -23,6 +23,8 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.utils.SerializationUtils;
import com.alibaba.dubbo.config.api.DemoException;
import com.alibaba.dubbo.config.api.DemoService;
import com.alibaba.dubbo.config.api.User;
......@@ -123,4 +125,36 @@ public class GenericServiceTest {
}
}
@Test
public void testGenericSerializationJava() throws Exception {
ServiceConfig<DemoService> service = new ServiceConfig<DemoService>();
service.setApplication(new ApplicationConfig("generic-provider"));
service.setRegistry(new RegistryConfig("N/A"));
service.setProtocol(new ProtocolConfig("dubbo", 29581));
service.setInterface(DemoService.class.getName());
DemoServiceImpl ref = new DemoServiceImpl();
service.setRef(ref);
service.export();
try {
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
reference.setApplication(new ApplicationConfig("generic-consumer"));
reference.setInterface(DemoService.class);
reference.setUrl("dubbo://127.0.0.1:29581?scope=remote");
reference.setGeneric(Constants.GENERIC_SERIALIZATION_JAVA);
GenericService genericService = reference.get();
try {
String name = "kimi";
byte[] arg = SerializationUtils.javaSerialize(name);
Object obj = genericService.$invoke("sayName", new String[]{String.class.getName()}, new Object[]{arg});
Assert.assertTrue(obj instanceof byte[]);
byte[] result = (byte[])obj;
Assert.assertEquals(ref.sayName(name), SerializationUtils.javaDeserialize(result));
} finally {
reference.destroy();
}
} finally {
service.unexport();
}
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter;
import java.lang.reflect.Method;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.PojoUtils;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
import java.io.IOException;
import java.lang.reflect.Method;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.PojoUtils;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.SerializationUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
* GenericInvokerFilter.
*
* @author william.liangf
*/
*/
@Activate(group = Constants.PROVIDER, order = -100000)
public class GenericFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& ! invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)) {
&& ! invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
......@@ -52,13 +56,43 @@ public class GenericFilter implements Filter {
if (args == null) {
args = new Object[params.length];
}
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for(int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
args[i] = SerializationUtils.javaDeserialize((byte[]) args[i]);
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
new StringBuilder(32).append("Generic serialization [")
.append(Constants.GENERIC_SERIALIZATION_JAVA)
.append("] only support message type ")
.append(byte[].class)
.append(" and your message type is ")
.append(args[i].getClass()).toString());
}
}
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
if (result.hasException()
&& ! (result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
return new RpcResult(PojoUtils.generalize(result.getValue()));
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
return new RpcResult(SerializationUtils.javaSerialize(result.getValue()));
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
......
......@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter;
package com.alibaba.dubbo.rpc.filter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
......@@ -33,39 +33,41 @@ import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
/**
* GenericImplInvokerFilter
*
* @author william.liangf
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
* GenericImplInvokerFilter
*
* @author william.liangf
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 100000)
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 100000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[] {String.class, String[].class, Object[].class};
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)
&& ! Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
RpcInvocation invocation2 = (RpcInvocation) invocation;
String methodName = invocation2.getMethodName();
Class<?>[] parameterTypes = invocation2.getParameterTypes();
Object[] arguments = invocation2.getArguments();
String[] types = new String[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i ++) {
types[i] = ReflectUtils.getName(parameterTypes[i]);
}
Object[] args = PojoUtils.generalize(arguments);
invocation2.setMethodName(Constants.$INVOKE);
invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
invocation2.setArguments(new Object[] {methodName, types, args});
Result result = invoker.invoke(invocation2);
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[] {String.class, String[].class, Object[].class};
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& ! Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
RpcInvocation invocation2 = (RpcInvocation) invocation;
String methodName = invocation2.getMethodName();
Class<?>[] parameterTypes = invocation2.getParameterTypes();
Object[] arguments = invocation2.getArguments();
String[] types = new String[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i ++) {
types[i] = ReflectUtils.getName(parameterTypes[i]);
}
Object[] args = PojoUtils.generalize(arguments);
invocation2.setMethodName(Constants.$INVOKE);
invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
invocation2.setArguments(new Object[] {methodName, types, args});
Result result = invoker.invoke(invocation2);
if (! result.hasException()) {
Object value = result.getValue();
try {
......@@ -111,10 +113,40 @@ public class GenericImplFilter implements Filter {
} catch (Throwable e) {
throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e);
}
}
return result;
}
return invoker.invoke(invocation);
}
}
return result;
}
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(arg.getClass().getName());
}
}
}
((RpcInvocation)invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
private void error(String type) throws RpcException {
throw new RpcException(
new StringBuilder(32)
.append("Generic serialization [")
.append(Constants.GENERIC_SERIALIZATION_JAVA)
.append("] only support message type ")
.append(byte[].class)
.append(" and your message type is ")
.append(type).toString());
}
}
\ No newline at end of file
......@@ -32,4 +32,20 @@ public class ProtocolUtils {
return buf.toString();
}
public static boolean isGeneric(String generic) {
return generic != null
&& !"".equals(generic)
&& (Constants.GENERIC_SERIALIZATION_DEFAULT.equalsIgnoreCase(generic) /* 正常的泛化调用 */
|| Constants.GENERIC_SERIALIZATION_JAVA.equalsIgnoreCase(generic)); /* 支持java序列化的流式泛化调用 */
}
public static boolean isDefaultGenericSerialization(String generic) {
return isGeneric(generic)
&& Constants.GENERIC_SERIALIZATION_DEFAULT.equalsIgnoreCase(generic);
}
public static boolean isJavaGenericSerialization(String generic) {
return isGeneric(generic)
&& Constants.GENERIC_SERIALIZATION_JAVA.equalsIgnoreCase(generic);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册