提交 dab95521 编写于 作者: L liangfei0201

Merge branch 'master' of https://github.com/AlibabaTech/dubbo

......@@ -574,7 +574,9 @@ public class Constants {
public static final String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
public static final String DEFAULT_EXECUTOR_SERVICE_KEY = "threadnotsafe";
public static final String GENERIC_SERIALIZATION_JAVA = "java";
public static final String GENERIC_SERIALIZATION_DEFAULT = "true";
/*
* private Constants(){ }
......
......@@ -18,12 +18,19 @@ package com.alibaba.dubbo.common.store;
import com.alibaba.dubbo.common.extension.SPI;
import java.util.Map;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
@SPI("threadnotsafe")
@SPI("simple")
public interface DataStore {
/**
* return a snapshot value of componentName
*/
Map<String,Object> get(String componentName);
Object get(String componentName, String key);
void put(String componentName, String key, Object value);
......
......@@ -18,42 +18,49 @@ package com.alibaba.dubbo.common.store.support;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.alibaba.dubbo.common.store.DataStore;
/**
* @author <a href="mailto:ding.lid@alibaba-inc.com">ding.lid</a>
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class ThreadNotSafeDataStore implements DataStore {
public class SimpleDataStore implements DataStore {
// <组件类名或标识, <数据名, 数据值>>
private Map<String, Map<String, Object>> datas =
new HashMap<String, Map<String, Object>>();
private ConcurrentMap<String, ConcurrentMap<String, Object>> data =
new ConcurrentHashMap<String, ConcurrentMap<String,Object>>();
public Map<String, Object> get(String componentName) {
ConcurrentMap<String, Object> value = data.get(componentName);
if(value == null) return new HashMap<String, Object>();
return new HashMap<String, Object>(value);
}
@SuppressWarnings("unchecked")
public Object get(String componentName, String key) {
if (!datas.containsKey(componentName)) {
if (!data.containsKey(componentName)) {
return null;
}
return datas.get(componentName).get(key);
return data.get(componentName).get(key);
}
public void put(String componentName, String key, Object value) {
Map<String, Object> componentDatas = null;
if (!datas.containsKey(componentName)) {
componentDatas = new HashMap<String, Object>();
} else {
componentDatas = datas.get(componentName);
Map<String, Object> componentData = data.get(componentName);
if(null == componentData) {
data.putIfAbsent(componentName, new ConcurrentHashMap<String, Object>());
componentData = data.get(componentName);
}
componentDatas.put(key, value);
datas.put(componentName, componentDatas);
componentData.put(key, value);
}
public void remove(String componentName, String key) {
if (!datas.containsKey(componentName)) {
if (!data.containsKey(componentName)) {
return;
}
datas.get(componentName).remove(key);
data.get(componentName).remove(key);
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class SerializationUtils {
private static final Logger log = LoggerFactory.getLogger(SerializationUtils.class);
private SerializationUtils() {}
public static Object javaDeserialize(byte[] bytes) throws Exception {
ObjectInputStream objectInputStream = new ObjectInputStream(
new ByteArrayInputStream(bytes));
try {
return objectInputStream.readObject();
} finally {
close(objectInputStream);
}
}
public static byte[] javaSerialize(Object obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
try {
objectOutputStream.writeObject(obj);
return out.toByteArray();
} finally {
close(objectOutputStream);
}
}
private static void close(Closeable closeable) {
try {
closeable.close();
} catch (IOException e) {
if (log.isWarnEnabled()) {
log.warn("Close closeable failed: " + e.getMessage(), e);
}
}
}
}
threadnotsafe=com.alibaba.dubbo.common.store.support.ThreadNotSafeDataStore
\ No newline at end of file
simple=com.alibaba.dubbo.common.store.support.SimpleDataStore
\ No newline at end of file
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.store.support;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author <a href="mailto:ding.lid@alibaba-inc.com">ding.lid</a>
*/
public class SimpleDataStoreTest {
SimpleDataStore dataStore = new SimpleDataStore();
@Test
public void testPut_Get() throws Exception {
assertNull(dataStore.get("xxx", "yyy"));
dataStore.put("name", "key", "1");
assertEquals("1", dataStore.get("name", "key"));
assertNull(dataStore.get("xxx", "yyy"));
}
@Test
public void testRemove() throws Exception {
dataStore.remove("xxx", "yyy");
dataStore.put("name", "key", "1");
dataStore.remove("name", "key");
assertNull(dataStore.get("name", "key"));
}
}
......@@ -18,6 +18,7 @@ package com.alibaba.dubbo.config;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.config.support.Parameter;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
......@@ -40,7 +41,7 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
protected Boolean init;
// 是否使用泛接口
protected Boolean generic;
protected String generic;
// 优先从JVM内获取引用实例
protected Boolean injvm;
......@@ -79,13 +80,23 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
@Parameter(excluded = true)
public Boolean isGeneric() {
return generic;
return ProtocolUtils.isGeneric(generic);
}
public void setGeneric(Boolean generic) {
if (generic != null) {
this.generic = generic.toString();
}
}
public void setGeneric(String generic) {
this.generic = generic;
}
public String getGeneric() {
return generic;
}
/**
* @return
* @deprecated 通过scope进行判断,scope=local
......
......@@ -94,7 +94,25 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
private transient boolean destroyed;
private final List<URL> urls = new ArrayList<URL>();
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
@Override
protected void finalize() throws Throwable {
super.finalize();
if(! ReferenceConfig.this.destroyed) {
logger.warn("ReferenceConfig(" + url + ") is not destroyed when finalize!");
try {
ReferenceConfig.this.destroy();
} catch (Throwable t) {
logger.warn("Unexpected err when destroy invoker of ReferenceConfig(" + url + ") in finalize method!", t);
}
}
}
};
public ReferenceConfig() {}
public ReferenceConfig(Reference reference) {
......@@ -147,13 +165,10 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
// 获取消费者全局配置
checkDefault();
appendProperties(this);
if (generic == null && consumer != null) {
generic = consumer.isGeneric();
}
if (generic == null) {
generic = false;
if (! isGeneric() && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (generic) {
if (isGeneric()) {
interfaceClass = GenericService.class;
} else {
try {
......@@ -242,7 +257,7 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! generic) {
if (! isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
......@@ -406,9 +421,8 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
if (interfaceClass != null) {
return interfaceClass;
}
if ((generic != null && generic.booleanValue())
|| (consumer != null && consumer.isGeneric() != null
&& consumer.isGeneric().booleanValue())) {
if (isGeneric()
|| (getConsumer() != null && getConsumer().isGeneric())) {
return GenericService.class;
}
try {
......
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.config.utils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.ReferenceConfig;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* a simple util class for cache {@link ReferenceConfig}.
* <p>
* {@link ReferenceConfig} is a heavy Object, it's necessary to cache these object
* for the framework which create {@link ReferenceConfig} frequently.
* <p>
* You can implement and use your own {@link ReferenceConfig} cache if you need use complicate strategy.
*
* @author ding.lid
*/
public class ReferenceConfigCache {
public static final String DEFAULT_NAME = "_DEFAULT_";
static final ConcurrentMap<String, ReferenceConfigCache> cacheHolder = new ConcurrentHashMap<String, ReferenceConfigCache>();
/**
* Get the cache use default name and {@link #DEFAULT_KEY_GENERATOR} to generate cache key.
* Create cache if not existed yet.
*/
public static ReferenceConfigCache getCache() {
return getCache(DEFAULT_NAME);
}
/**
* Get the cache use specified name and {@link KeyGenerator}.
* Create cache if not existed yet.
*/
public static ReferenceConfigCache getCache(String name) {
return getCache(name, DEFAULT_KEY_GENERATOR);
}
/**
* Get the cache use specified {@link KeyGenerator}.
* Create cache if not existed yet.
*/
public static ReferenceConfigCache getCache(String name, KeyGenerator keyGenerator) {
ReferenceConfigCache cache = cacheHolder.get(name);
if(cache != null) {
return cache;
}
cacheHolder.putIfAbsent(name, new ReferenceConfigCache(name, keyGenerator));
return cacheHolder.get(name);
}
public static interface KeyGenerator {
String generateKey(ReferenceConfig<?> referenceConfig);
}
/**
* Create the key with the <b>Group</b>, <b>Interface</b> and <b>version</b> attribute of {@link ReferenceConfig}.
* <p>
* key example: <code>group1/com.alibaba.foo.FooService:1.0.0</code>.
*/
public static final KeyGenerator DEFAULT_KEY_GENERATOR = new KeyGenerator() {
public String generateKey(ReferenceConfig<?> referenceConfig) {
String iName = referenceConfig.getInterface();
if(StringUtils.isBlank(iName)) {
Class<?> clazz = referenceConfig.getInterfaceClass();
iName = clazz.getName();
}
if(StringUtils.isBlank(iName)) {
throw new IllegalArgumentException("No interface info in ReferenceConfig" + referenceConfig);
}
StringBuilder ret = new StringBuilder();
if(! StringUtils.isBlank(referenceConfig.getGroup())) {
ret.append(referenceConfig.getGroup()).append("/");
}
ret.append(iName);
if(! StringUtils.isBlank(referenceConfig.getVersion())) {
ret.append(":").append(referenceConfig.getVersion());
}
return ret.toString();
}
};
private final String name;
private final KeyGenerator generator;
ConcurrentMap<String, ReferenceConfig<?>> cache = new ConcurrentHashMap<String, ReferenceConfig<?>>();
private ReferenceConfigCache(String name, KeyGenerator generator) {
this.name = name;
this.generator = generator;
}
public <T> T get(ReferenceConfig<T> referenceConfig) {
String key = generator.generateKey(referenceConfig);
ReferenceConfig<?> config = cache.get(key);
if(config != null) {
return (T) config.get();
}
cache.putIfAbsent(key, referenceConfig);
config = cache.get(key);
return (T) config.get();
}
void destroyKey(String key) {
ReferenceConfig<?> config = cache.remove(key);
if(config == null) return;
config.destroy();
}
/**
* clear and destroy one {@link ReferenceConfig} in the cache.
* @param referenceConfig use for create key.
*/
public <T> void destroy(ReferenceConfig<T> referenceConfig) {
String key = generator.generateKey(referenceConfig);
destroyKey(key);
}
/**
* clear and destroy all {@link ReferenceConfig} in the cache.
*/
public void destroyAll() {
Set<String> set = new HashSet<String>(cache.keySet());
for(String key : set) {
destroyKey(key);
}
}
}
......@@ -23,6 +23,8 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.utils.SerializationUtils;
import com.alibaba.dubbo.config.api.DemoException;
import com.alibaba.dubbo.config.api.DemoService;
import com.alibaba.dubbo.config.api.User;
......@@ -123,4 +125,36 @@ public class GenericServiceTest {
}
}
@Test
public void testGenericSerializationJava() throws Exception {
ServiceConfig<DemoService> service = new ServiceConfig<DemoService>();
service.setApplication(new ApplicationConfig("generic-provider"));
service.setRegistry(new RegistryConfig("N/A"));
service.setProtocol(new ProtocolConfig("dubbo", 29581));
service.setInterface(DemoService.class.getName());
DemoServiceImpl ref = new DemoServiceImpl();
service.setRef(ref);
service.export();
try {
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
reference.setApplication(new ApplicationConfig("generic-consumer"));
reference.setInterface(DemoService.class);
reference.setUrl("dubbo://127.0.0.1:29581?scope=remote");
reference.setGeneric(Constants.GENERIC_SERIALIZATION_JAVA);
GenericService genericService = reference.get();
try {
String name = "kimi";
byte[] arg = SerializationUtils.javaSerialize(name);
Object obj = genericService.$invoke("sayName", new String[]{String.class.getName()}, new Object[]{arg});
Assert.assertTrue(obj instanceof byte[]);
byte[] result = (byte[])obj;
Assert.assertEquals(ref.sayName(name), SerializationUtils.javaDeserialize(result));
} finally {
reference.destroy();
}
} finally {
service.unexport();
}
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.config.utils;
import com.alibaba.dubbo.config.ReferenceConfig;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author ding.lid
*/
public class MockReferenceConfig extends ReferenceConfig<String> {
static AtomicLong counter = new AtomicLong();
String value;
public boolean isGetMethodRun() {
return value != null;
}
boolean destroyMethodRun = false;
public boolean isDestroyMethodRun() {
return destroyMethodRun;
}
public static void setCounter(long c) {
counter.set(c);
}
@Override
public synchronized String get() {
if(value != null) return value;
value = "" + counter.getAndIncrement();
return value;
}
@Override
public synchronized void destroy() {
destroyMethodRun = true;
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.config.utils;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author ding.lid
*/
public class ReferenceConfigCacheTest {
@Before
public void setUp() throws Exception {
MockReferenceConfig.setCounter(0);
ReferenceConfigCache.cacheHolder.clear();
}
@Test
public void testGetCache_SameReference() throws Exception {
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
{
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
String value = cache.get(config);
assertTrue(config.isGetMethodRun());
assertEquals("0", value);
}
{
MockReferenceConfig configCopy = new MockReferenceConfig();
configCopy.setInterface("FooService");
configCopy.setGroup("group1");
configCopy.setVersion("1.0.0");
String value = cache.get(configCopy);
assertFalse(configCopy.isGetMethodRun());
assertEquals("0", value);
}
}
@Test
public void testGetCache_DiffReference() throws Exception {
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
{
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
String value = cache.get(config);
assertTrue(config.isGetMethodRun());
assertEquals("0", value);
}
{
MockReferenceConfig configCopy = new MockReferenceConfig();
configCopy.setInterface("XxxService");
configCopy.setGroup("group1");
configCopy.setVersion("1.0.0");
String value = cache.get(configCopy);
assertTrue(configCopy.isGetMethodRun());
assertEquals("1", value);
}
}
@Test
public void testGetCache_DiffName() throws Exception {
{
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
String value = cache.get(config);
assertTrue(config.isGetMethodRun());
assertEquals("0", value);
}
{
ReferenceConfigCache cache = ReferenceConfigCache.getCache("foo");
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
String value = cache.get(config);
assertTrue(config.isGetMethodRun()); // ͬCacheͬReferenceConfigҲInit
assertEquals("1", value);
}
}
@Test
public void testDestroy() throws Exception {
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
cache.get(config);
MockReferenceConfig configCopy = new MockReferenceConfig();
configCopy.setInterface("XxxService");
configCopy.setGroup("group1");
configCopy.setVersion("1.0.0");
cache.get(configCopy);
assertEquals(2, cache.cache.size());
cache.destroy(config);
assertTrue(config.isDestroyMethodRun());
assertEquals(1, cache.cache.size());
cache.destroy(configCopy);
assertTrue(configCopy.isDestroyMethodRun());
assertEquals(0, cache.cache.size());
}
@Test
public void testDestroyAll() throws Exception {
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
MockReferenceConfig config = new MockReferenceConfig();
config.setInterface("FooService");
config.setGroup("group1");
config.setVersion("1.0.0");
cache.get(config);
MockReferenceConfig configCopy = new MockReferenceConfig();
configCopy.setInterface("XxxService");
configCopy.setGroup("group1");
configCopy.setVersion("1.0.0");
cache.get(configCopy);
assertEquals(2, cache.cache.size());
cache.destroyAll();
assertTrue(config.isDestroyMethodRun());
assertTrue(configCopy.isDestroyMethodRun());
assertEquals(0, cache.cache.size());
}
}
......@@ -47,8 +47,9 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().put(
Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Constants.DEFAULT_EXECUTOR_SERVICE_KEY, executor);
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()), executor);
}
public void close() {
......
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter;
import java.lang.reflect.Method;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.PojoUtils;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
import java.io.IOException;
import java.lang.reflect.Method;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.PojoUtils;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.SerializationUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
* GenericInvokerFilter.
*
* @author william.liangf
*/
*/
@Activate(group = Constants.PROVIDER, order = -100000)
public class GenericFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& ! invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)) {
&& ! invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
......@@ -52,13 +56,43 @@ public class GenericFilter implements Filter {
if (args == null) {
args = new Object[params.length];
}
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for(int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
args[i] = SerializationUtils.javaDeserialize((byte[]) args[i]);
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
new StringBuilder(32).append("Generic serialization [")
.append(Constants.GENERIC_SERIALIZATION_JAVA)
.append("] only support message type ")
.append(byte[].class)
.append(" and your message type is ")
.append(args[i].getClass()).toString());
}
}
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
if (result.hasException()
&& ! (result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
return new RpcResult(PojoUtils.generalize(result.getValue()));
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
return new RpcResult(SerializationUtils.javaSerialize(result.getValue()));
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
......
......@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter;
package com.alibaba.dubbo.rpc.filter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
......@@ -33,39 +33,41 @@ import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
/**
* GenericImplInvokerFilter
*
* @author william.liangf
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
/**
* GenericImplInvokerFilter
*
* @author william.liangf
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 100000)
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 100000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[] {String.class, String[].class, Object[].class};
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().getParameter(Constants.GENERIC_KEY, false)
&& ! Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
RpcInvocation invocation2 = (RpcInvocation) invocation;
String methodName = invocation2.getMethodName();
Class<?>[] parameterTypes = invocation2.getParameterTypes();
Object[] arguments = invocation2.getArguments();
String[] types = new String[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i ++) {
types[i] = ReflectUtils.getName(parameterTypes[i]);
}
Object[] args = PojoUtils.generalize(arguments);
invocation2.setMethodName(Constants.$INVOKE);
invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
invocation2.setArguments(new Object[] {methodName, types, args});
Result result = invoker.invoke(invocation2);
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[] {String.class, String[].class, Object[].class};
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& ! Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
RpcInvocation invocation2 = (RpcInvocation) invocation;
String methodName = invocation2.getMethodName();
Class<?>[] parameterTypes = invocation2.getParameterTypes();
Object[] arguments = invocation2.getArguments();
String[] types = new String[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i ++) {
types[i] = ReflectUtils.getName(parameterTypes[i]);
}
Object[] args = PojoUtils.generalize(arguments);
invocation2.setMethodName(Constants.$INVOKE);
invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
invocation2.setArguments(new Object[] {methodName, types, args});
Result result = invoker.invoke(invocation2);
if (! result.hasException()) {
Object value = result.getValue();
try {
......@@ -111,10 +113,40 @@ public class GenericImplFilter implements Filter {
} catch (Throwable e) {
throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e);
}
}
return result;
}
return invoker.invoke(invocation);
}
}
return result;
}
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(arg.getClass().getName());
}
}
}
((RpcInvocation)invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
private void error(String type) throws RpcException {
throw new RpcException(
new StringBuilder(32)
.append("Generic serialization [")
.append(Constants.GENERIC_SERIALIZATION_JAVA)
.append("] only support message type ")
.append(byte[].class)
.append(" and your message type is ")
.append(type).toString());
}
}
\ No newline at end of file
......@@ -32,4 +32,20 @@ public class ProtocolUtils {
return buf.toString();
}
public static boolean isGeneric(String generic) {
return generic != null
&& !"".equals(generic)
&& (Constants.GENERIC_SERIALIZATION_DEFAULT.equalsIgnoreCase(generic) /* 正常的泛化调用 */
|| Constants.GENERIC_SERIALIZATION_JAVA.equalsIgnoreCase(generic)); /* 支持java序列化的流式泛化调用 */
}
public static boolean isDefaultGenericSerialization(String generic) {
return isGeneric(generic)
&& Constants.GENERIC_SERIALIZATION_DEFAULT.equalsIgnoreCase(generic);
}
public static boolean isJavaGenericSerialization(String generic) {
return isGeneric(generic)
&& Constants.GENERIC_SERIALIZATION_JAVA.equalsIgnoreCase(generic);
}
}
......@@ -15,6 +15,7 @@
*/
package com.alibaba.dubbo.rpc.protocol.dubbo.status;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -34,20 +35,37 @@ import com.alibaba.dubbo.common.store.DataStore;
public class ThreadPoolStatusChecker implements StatusChecker {
public Status check() {
ExecutorService executor = (ExecutorService) ExtensionLoader
.getExtensionLoader(DataStore.class).getDefaultExtension()
.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Constants.DEFAULT_EXECUTOR_SERVICE_KEY);
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
return new Status(ok ? Status.Level.OK : Status.Level.WARN,
"max:" + tp.getMaximumPoolSize()
+ ",core:" + tp.getCorePoolSize()
+ ",largest:" + tp.getLargestPoolSize()
+ ",active:" + tp.getActiveCount()
+ ",task:" + tp.getTaskCount());
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return new Status(Status.Level.UNKNOWN);
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册