提交 f83e70b5 编写于 作者: Y YunaiV

dubbo rpc redis 等等

上级 408eeb2a
......@@ -52,14 +52,11 @@ limitations under the License.
<groupId>com.alibaba</groupId>
<artifactId>dubbo-cluster</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>com.alibaba</groupId>-->
<!--<artifactId>dubbo-rpc-http</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>com.alibaba</groupId>-->
<!--<artifactId>dubbo-remoting-http</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-rpc-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
......
package com.alibaba.dubbo.demo.provider;
import com.alibaba.dubbo.demo.HttpDemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class HttpConsumer {
public static void main(String[] args) {
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
context.start();
HttpDemoService demoService = (HttpDemoService) context.getBean("demoService"); // get remote service proxy
while (true) {
try {
Thread.sleep(1000);
String hello = demoService.hello("world"); // call remote method
System.out.println(hello); // get result
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
......@@ -21,21 +21,17 @@ limitations under the License.
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<dubbo:provider delay="-1" retries="0" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.HttpDemoService" ref="demoService">
</dubbo:service>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="http" port="8080" server="tomcat" />
<!--<dubbo:protocol id="pb" name="dubbo" port="20881"/>-->
<!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
don't set it same as provider -->
<dubbo:application name="demo-consumer" />
<!-- use multicast registry center to discover service -->
<!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.HttpDemoService" >
</dubbo:reference>
</beans>
\ No newline at end of file
......@@ -65,16 +65,27 @@ public class RpcContext {
private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
/**
* 异步调用 Future
*/
private Future<?> future;
private List<URL> urls;
/**
* URL 对象
*/
private URL url;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型数组
*/
private Class<?>[] parameterTypes;
/**
* 参数值数组
*/
private Object[] arguments;
/**
* 服务消费者地址
......
......@@ -75,6 +75,7 @@ public abstract class AbstractProtocol implements Protocol {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
@Override
public void destroy() {
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
......
......@@ -32,11 +32,18 @@ import java.util.concurrent.CopyOnWriteArrayList;
/**
* AbstractProxyProtocol
*
* Proxy 协议抽象类
*/
public abstract class AbstractProxyProtocol extends AbstractProtocol {
/**
* 需要抛出的异常类集合,详见 {@link #refer(Class, URL)} 方法。
*/
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
/**
* ProxyFactory 对象
*/
private ProxyFactory proxyFactory;
public AbstractProxyProtocol() {
......@@ -60,18 +67,27 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
this.proxyFactory = proxyFactory;
}
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 获得服务键
final String uri = serviceKey(invoker.getUrl());
// 获得 Exporter 对象。若已经暴露,直接返回。
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
// 执行暴露服务
final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
// 创建 Exporter 对象
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
// 取消暴露
super.unexport();
exporterMap.remove(uri);
// 执行取消暴露的回调
if (runnable != null) {
try {
runnable.run();
......@@ -80,18 +96,26 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
}
}
}
};
// 添加到 Exporter 集合
exporterMap.put(uri, exporter);
return exporter;
}
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
// 执行引用服务
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
// 创建 Invoker 对象
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = tagert.invoke(invocation);
// 调用
Result result = target.invoke(invocation);
// 若返回结果带有异常,并且需要抛出,则抛出异常。
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
......@@ -102,15 +126,19 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
}
return result;
} catch (RpcException e) {
// 若是未知异常,获得异常对应的错误码
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
// 抛出 RpcException 异常
throw getRpcException(type, url, invocation, e);
}
}
};
// 添加到 Invoker 集合。
invokers.add(invoker);
return invoker;
}
......@@ -130,12 +158,37 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
}
/**
* 获得异常对应的错误码
*
* @param e 异常
* @return 错误码
*/
protected int getErrorCode(Throwable e) {
return RpcException.UNKNOWN_EXCEPTION;
}
/**
* 执行暴露,并返回取消暴露的回调 Runnable
*
* @param impl 服务 Proxy 对象
* @param type 服务接口
* @param url URL
* @param <T> 服务接口
* @return 消暴露的回调 Runnable
* @throws RpcException 当发生异常
*/
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
/**
* 执行引用,并返回调用远程服务的 Service 对象
*
* @param type 服务接口
* @param url URL
* @param <T> 服务接口
* @return 调用远程服务的 Service 对象
* @throws RpcException 当发生异常
*/
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
}
......@@ -54,7 +54,7 @@ final public class MockInvoker<T> implements Invoker<T> {
}
public static Object parseMockValue(String mock, Type[] returnTypes) throws Exception {
Object value = null;
Object value;
if ("empty".equals(mock)) {
value = ReflectUtils.getEmptyObject(returnTypes != null && returnTypes.length > 0 ? (Class<?>) returnTypes[0] : null);
} else if ("null".equals(mock)) {
......
......@@ -42,13 +42,26 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* http rpc support.
*
* Hessian 协议实现类
*/
public class HessianProtocol extends AbstractProxyProtocol {
/**
* Http 服务器集合
*
* key:ip:port
*/
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/**
* Spring HttpInvokerServiceExporter 集合
*
* key:path 服务名
*/
private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();
/**
* HttpBinder$Adaptive 对象
*/
private HttpBinder httpBinder;
public HessianProtocol() {
......@@ -63,16 +76,21 @@ public class HessianProtocol extends AbstractProxyProtocol {
return 80;
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
// 获得服务器地址
String addr = getAddr(url);
// 获得 HttpServer 对象。若不存在,进行创建。
HttpServer server = serverMap.get(addr);
if (server == null) {
server = httpBinder.bind(url, new HessianHandler());
server = httpBinder.bind(url, new HessianHandler()); // HessianHandler
serverMap.put(addr, server);
}
// 添加到 skeletonMap 中
final String path = url.getAbsolutePath();
HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);
// 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
skeletonMap.remove(path);
......@@ -80,21 +98,27 @@ public class HessianProtocol extends AbstractProxyProtocol {
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
// 创建 HessianProxyFactory 对象
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
// 创建连接器工厂为 HttpClientConnectionFactory 对象,即 Apache HttpClient
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
}
// 设置超时时间
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
// 创建 Service Proxy 对象
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
@Override
protected int getErrorCode(Throwable e) {
if (e instanceof HessianConnectionException) {
if (e.getCause() != null) {
......@@ -110,8 +134,11 @@ public class HessianProtocol extends AbstractProxyProtocol {
return super.getErrorCode(e);
}
@Override
public void destroy() {
// 销毁
super.destroy();
// 销毁 HttpServer
for (String key : new ArrayList<String>(serverMap.keySet())) {
HttpServer server = serverMap.remove(key);
if (server != null) {
......@@ -129,12 +156,16 @@ public class HessianProtocol extends AbstractProxyProtocol {
private class HessianHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
// 获得 HessianSkeleton 对象
HessianSkeleton skeleton = skeletonMap.get(uri);
// 必须是 POST 请求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
// 执行调用
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
......
......@@ -34,6 +34,9 @@ import java.net.URL;
*/
public class HttpClientConnection implements HessianConnection {
/**
* Apache HttpClient
*/
private final HttpClient httpClient;
private final ByteArrayOutputStream output;
......@@ -48,35 +51,43 @@ public class HttpClientConnection implements HessianConnection {
this.request = new HttpPost(url.toString());
}
@Override
public void addHeader(String key, String value) {
request.addHeader(new BasicHeader(key, value));
}
@Override
public OutputStream getOutputStream() throws IOException {
return output;
}
@Override
public void sendRequest() throws IOException {
request.setEntity(new ByteArrayEntity(output.toByteArray()));
this.response = httpClient.execute(request);
}
@Override
public int getStatusCode() {
return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode();
}
@Override
public String getStatusMessage() {
return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase();
}
@Override
public String getContentEncoding() {
return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue();
}
@Override
public InputStream getInputStream() throws IOException {
return response == null || response.getEntity() == null ? null : response.getEntity().getContent();
}
@Override
public void close() throws IOException {
HttpPost request = this.request;
if (request != null) {
......@@ -84,6 +95,7 @@ public class HttpClientConnection implements HessianConnection {
}
}
@Override
public void destroy() throws IOException {
}
......
......@@ -23,7 +23,6 @@ import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.params.HttpConnectionParams;
import java.io.IOException;
import java.net.URL;
/**
......@@ -31,15 +30,20 @@ import java.net.URL;
*/
public class HttpClientConnectionFactory implements HessianConnectionFactory {
/**
* Apache HttpClient
*/
private final HttpClient httpClient = new DefaultHttpClient();
@Override
public void setHessianProxyFactory(HessianProxyFactory factory) {
HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout());
HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout());
}
public HessianConnection open(URL url) throws IOException {
return new HttpClientConnection(httpClient, url);
@Override
public HessianConnection open(URL url) {
return new HttpClientConnection(httpClient, url); // HttpClientConnection
}
}
\ No newline at end of file
......@@ -42,15 +42,30 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* HttpProtocol
*
* HTTP 协议实现类
*/
public class HttpProtocol extends AbstractProxyProtocol {
/**
* 默认服务器端口
*/
public static final int DEFAULT_PORT = 80;
/**
* Http 服务器集合
*
* key:ip:port
*/
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/**
* Spring HttpInvokerServiceExporter 集合
*
* key:path 服务名
*/
private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();
/**
* HttpBinder$Adaptive 对象
*/
private HttpBinder httpBinder;
public HttpProtocol() {
......@@ -65,13 +80,17 @@ public class HttpProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
// 获得服务器地址
String addr = getAddr(url);
// 获得 HttpServer 对象。若不存在,进行创建。
HttpServer server = serverMap.get(addr);
if (server == null) {
server = httpBinder.bind(url, new InternalHandler());
server = httpBinder.bind(url, new InternalHandler()); // InternalHandler
serverMap.put(addr, server);
}
// 创建 HttpInvokerServiceExporter 对象
final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
httpServiceExporter.setServiceInterface(type);
httpServiceExporter.setService(impl);
......@@ -80,8 +99,10 @@ public class HttpProtocol extends AbstractProxyProtocol {
} catch (Exception e) {
throw new RpcException(e.getMessage(), e);
}
// 添加到 skeletonMap 中
final String path = url.getAbsolutePath();
skeletonMap.put(path, httpServiceExporter);
// 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
skeletonMap.remove(path);
......@@ -89,11 +110,14 @@ public class HttpProtocol extends AbstractProxyProtocol {
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 创建 HttpInvokerProxyFactoryBean 对象
final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
httpProxyFactoryBean.setServiceUrl(url.toIdentityString());
httpProxyFactoryBean.setServiceInterface(serviceType);
// 创建执行器 SimpleHttpInvokerRequestExecutor 对象
String client = url.getParameter(Constants.CLIENT_KEY);
if (client == null || client.length() == 0 || "simple".equals(client)) {
SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
......@@ -105,6 +129,7 @@ public class HttpProtocol extends AbstractProxyProtocol {
}
};
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
// 创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象
} else if ("commons".equals(client)) {
HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
......@@ -113,9 +138,12 @@ public class HttpProtocol extends AbstractProxyProtocol {
throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
}
httpProxyFactoryBean.afterPropertiesSet();
// 返回 HttpInvokerProxyFactoryBean 对象
return (T) httpProxyFactoryBean.getObject();
}
@Override
@SuppressWarnings("Duplicates")
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
......@@ -135,12 +163,15 @@ public class HttpProtocol extends AbstractProxyProtocol {
private class InternalHandler implements HttpHandler {
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException {
String uri = request.getRequestURI();
// 获得 HttpInvokerServiceExporter 对象
HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
// 必须是 POST 请求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
// 执行调用
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
......
......@@ -49,12 +49,15 @@ public class MemcachedProtocol extends AbstractProtocol {
return DEFAULT_PORT;
}
@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl());
}
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
try {
// 创建 MemcachedClient 对象
String address = url.getAddress();
String backup = url.getParameter(Constants.BACKUP_KEY);
if (backup != null && backup.length() > 0) {
......@@ -62,30 +65,38 @@ public class MemcachedProtocol extends AbstractProtocol {
}
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address));
final MemcachedClient memcachedClient = builder.build();
// 处理方法名的映射
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// Memcached get 指令
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
// Memcached set 指令
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
return new RpcResult();
// Memcached delele 指令
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
return new RpcResult();
// 不支持的指令,抛出异常
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
}
......@@ -100,14 +111,18 @@ public class MemcachedProtocol extends AbstractProtocol {
}
}
@Override
public void destroy() {
// 标记销毁
super.destroy();
// 关闭 MemcachedClient
try {
memcachedClient.shutdown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
......
......@@ -56,6 +56,7 @@ public class RedisProtocol extends AbstractProtocol {
return DEFAULT_PORT;
}
@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl());
}
......@@ -64,8 +65,10 @@ public class RedisProtocol extends AbstractProtocol {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter(Constants.SERIALIZATION_KEY, "java"));
}
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
try {
// 创建 GenericObjectPoolConfig 对象,设置配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
......@@ -86,46 +89,64 @@ public class RedisProtocol extends AbstractProtocol {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
// 创建 JedisPool 对象
final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
// 处理方法名的映射
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
// 创建 Invoker 对象
return new AbstractInvoker<T>(type, url) {
protected Result doInvoke(Invocation invocation) throws Throwable {
@Override
protected Result doInvoke(Invocation invocation) {
Jedis resource = null;
try {
// 获得 Redis Resource
resource = jedisPool.getResource();
// Redis get 指令
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 获得值
byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return new RpcResult();
}
// 反序列化
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
// 返回结果
return new RpcResult(oin.readObject());
// Redis set/put 指令
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 序列化
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
// 设置值
resource.set(key, output.toByteArray());
if (expiry > 1000) {
resource.expire(key, expiry / 1000);
}
// 返回结果
return new RpcResult();
// Redis remote/delete 指令
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 删除值
resource.del(String.valueOf(invocation.getArguments()[0]).getBytes());
// 返回结果
return new RpcResult();
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
......@@ -141,6 +162,7 @@ public class RedisProtocol extends AbstractProtocol {
}
throw re;
} finally {
// 归还 Redis Resource
if (resource != null) {
try {
jedisPool.returnResource(resource);
......@@ -151,14 +173,18 @@ public class RedisProtocol extends AbstractProtocol {
}
}
@Override
public void destroy() {
// 标记销毁
super.destroy();
// 销毁 Redis Pool
try {
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
......
......@@ -35,9 +35,14 @@ import java.rmi.RemoteException;
/**
* RmiProtocol.
*
* RMI 协议实现类
*/
public class RmiProtocol extends AbstractProxyProtocol {
/**
* 默认端口
*/
public static final int DEFAULT_PORT = 1099;
public RmiProtocol() {
......@@ -48,7 +53,9 @@ public class RmiProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
// 创建 RmiServiceExporter 对象
final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();
rmiServiceExporter.setRegistryPort(url.getPort());
rmiServiceExporter.setServiceName(url.getPath());
......@@ -59,6 +66,7 @@ public class RmiProtocol extends AbstractProxyProtocol {
} catch (RemoteException e) {
throw new RpcException(e.getMessage(), e);
}
// 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
try {
......@@ -70,10 +78,13 @@ public class RmiProtocol extends AbstractProxyProtocol {
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 创建 RmiProxyFactoryBean 对象
final RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
// RMI needs extra parameter since it uses customized remote invocation object
// RMI传输时使用自定义的远程执行对象,从而传递额外的参数
if (url.getParameter(Constants.DUBBO_VERSION_KEY, Version.getVersion()).equals(Version.getVersion())) {
// Check dubbo version on provider, this feature only support
rmiProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
......@@ -82,15 +93,18 @@ public class RmiProtocol extends AbstractProxyProtocol {
}
});
}
// 设置相关参数
rmiProxyFactoryBean.setServiceUrl(url.toIdentityString());
rmiProxyFactoryBean.setServiceInterface(serviceType);
rmiProxyFactoryBean.setCacheStub(true);
rmiProxyFactoryBean.setLookupStubOnStartup(true);
rmiProxyFactoryBean.setRefreshStubOnConnectFailure(true);
rmiProxyFactoryBean.afterPropertiesSet();
// 创建 Service Proxy 对象
return (T) rmiProxyFactoryBean.getObject();
}
@Override
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
......
......@@ -25,11 +25,15 @@ import java.util.HashMap;
import java.util.Map;
public class RmiRemoteInvocation extends RemoteInvocation {
private static final long serialVersionUID = 1L;
private static final String dubboAttachmentsAttrName = "dubbo.attachments";
/**
* executed on consumer side
*
* 构造将在消费端执行
*/
public RmiRemoteInvocation(MethodInvocation methodInvocation) {
super(methodInvocation);
......@@ -40,6 +44,8 @@ public class RmiRemoteInvocation extends RemoteInvocation {
* Need to restore context on provider side (Though context will be overridden by Invocation's attachment
* when ContextFilter gets executed, we will restore the attachment when Invocation is constructed, check more
* from {@link com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler}
*
* 服务端执行时,重新放入上下文(虽然这时上下文在ContextFilter执行时将被Invocation的attachments覆盖,我们在Invocation构造时还原attachments, see InvokerInvocationHandler)
*/
@SuppressWarnings("unchecked")
@Override
......
......@@ -50,17 +50,33 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* WebServiceProtocol.
*
* WebService 协议实现类
*/
public class WebServiceProtocol extends AbstractProxyProtocol {
/**
* 默认服务器端口
*/
public static final int DEFAULT_PORT = 80;
/**
* Http 服务器集合
*
* key:ip:port
*/
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/**
* 《我眼中的CXF之Bus》http://jnn.iteye.com/blog/94746
* 《CXF BUS》https://blog.csdn.net/chen_fly2011/article/details/56664908
*/
private final ExtensionManagerBus bus = new ExtensionManagerBus();
/**
*
*/
private final HTTPTransportFactory transportFactory = new HTTPTransportFactory();
/**
* HttpBinder$Adaptive 对象
*/
private HttpBinder httpBinder;
public WebServiceProtocol() {
......@@ -76,13 +92,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
// 获得服务器地址
String addr = getAddr(url);
// 获得 HttpServer 对象。若不存在,进行创建。
HttpServer httpServer = serverMap.get(addr);
if (httpServer == null) {
httpServer = httpBinder.bind(url, new WebServiceHandler());
httpServer = httpBinder.bind(url, new WebServiceHandler()); // WebServiceHandler
serverMap.put(addr, httpServer);
}
// 创建 ServerFactoryBean 对象
final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
serverFactoryBean.setAddress(url.getAbsolutePath());
serverFactoryBean.setServiceClass(type);
......@@ -90,6 +110,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
serverFactoryBean.setBus(bus);
serverFactoryBean.setDestinationFactory(transportFactory);
serverFactoryBean.create();
// 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
serverFactoryBean.destroy();
......@@ -97,13 +118,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 创建 ClientProxyFactoryBean 对象
ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean();
proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString());
proxyFactoryBean.setServiceClass(serviceType);
proxyFactoryBean.setBus(bus);
// 创建 Service Proxy 对象
T ref = (T) proxyFactoryBean.create();
// 设置超时相关属性
Client proxy = ClientProxy.getClient(ref);
HTTPConduit conduit = (HTTPConduit) proxy.getConduit();
HTTPClientPolicy policy = new HTTPClientPolicy();
......@@ -113,6 +138,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
return ref;
}
@Override
protected int getErrorCode(Throwable e) {
if (e instanceof Fault) {
e = e.getCause();
......@@ -129,7 +155,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
private volatile ServletController servletController;
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// 创建 ServletController 对象,设置使用 DispatcherServlet 。
if (servletController == null) {
HttpServlet httpServlet = DispatcherServlet.getInstance();
if (httpServlet == null) {
......@@ -142,7 +170,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
}
}
}
// 设置调用方地址
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
// 执行调用
servletController.invoke(request, response);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册