From f83e70b53389a064e49babe32e61a5648002a44a Mon Sep 17 00:00:00 2001
From: YunaiV <>
Date: Sun, 8 Apr 2018 22:24:23 +0800
Subject: [PATCH] =?UTF-8?q?dubbo=20rpc=20redis=20=E7=AD=89=E7=AD=89?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
dubbo-demo/dubbo-http-demo-consumer/pom.xml | 13 ++--
.../dubbo/demo/provider/HttpConsumer.java | 30 ++++++++++
...o-provider.xml => dubbo-demo-consumer.xml} | 28 ++++-----
.../com/alibaba/dubbo/rpc/RpcContext.java | 19 ++++--
.../dubbo/rpc/protocol/AbstractProtocol.java | 1 +
.../rpc/protocol/AbstractProxyProtocol.java | 59 ++++++++++++++++++-
.../dubbo/rpc/support/MockInvoker.java | 2 +-
.../rpc/protocol/hessian/HessianProtocol.java | 37 +++++++++++-
.../hessian/HttpClientConnection.java | 12 ++++
.../hessian/HttpClientConnectionFactory.java | 10 +++-
.../dubbo/rpc/protocol/http/HttpProtocol.java | 43 ++++++++++++--
.../protocol/memcached/MemcachedProtocol.java | 15 +++++
.../rpc/protocol/redis/RedisProtocol.java | 30 +++++++++-
.../dubbo/rpc/protocol/rmi/RmiProtocol.java | 14 +++++
.../rpc/protocol/rmi/RmiRemoteInvocation.java | 6 ++
.../webservice/WebServiceProtocol.java | 40 +++++++++++--
16 files changed, 308 insertions(+), 51 deletions(-)
create mode 100644 dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java
rename dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/{dubbo-demo-provider.xml => dubbo-demo-consumer.xml} (64%)
diff --git a/dubbo-demo/dubbo-http-demo-consumer/pom.xml b/dubbo-demo/dubbo-http-demo-consumer/pom.xml
index 7d30d8cb2..f763e83ca 100644
--- a/dubbo-demo/dubbo-http-demo-consumer/pom.xml
+++ b/dubbo-demo/dubbo-http-demo-consumer/pom.xml
@@ -52,14 +52,11 @@ limitations under the License.
com.alibaba
dubbo-cluster
-
-
-
-
-
-
-
-
+
+ com.alibaba
+ dubbo-rpc-http
+
+
org.springframework.boot
spring-boot-starter-web
diff --git a/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java b/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java
new file mode 100644
index 000000000..024383fff
--- /dev/null
+++ b/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java
@@ -0,0 +1,30 @@
+package com.alibaba.dubbo.demo.provider;
+
+import com.alibaba.dubbo.demo.HttpDemoService;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class HttpConsumer {
+
+ public static void main(String[] args) {
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
+ context.start();
+ HttpDemoService demoService = (HttpDemoService) context.getBean("demoService"); // get remote service proxy
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+
+ String hello = demoService.hello("world"); // call remote method
+ System.out.println(hello); // get result
+
+
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+
+ }
+ }
+
+}
diff --git a/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml b/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
similarity index 64%
rename from dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml
rename to dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
index fb8a97857..1fb64a834 100644
--- a/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml
+++ b/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
@@ -21,21 +21,17 @@ limitations under the License.
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
index a458a68e0..78c6afbbd 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
@@ -65,16 +65,27 @@ public class RpcContext {
private final Map attachments = new HashMap();
private final Map values = new HashMap();
+ /**
+ * 异步调用 Future
+ */
private Future> future;
private List urls;
-
+ /**
+ * URL 对象
+ */
private URL url;
-
+ /**
+ * 方法名
+ */
private String methodName;
-
+ /**
+ * 参数类型数组
+ */
private Class>[] parameterTypes;
-
+ /**
+ * 参数值数组
+ */
private Object[] arguments;
/**
* 服务消费者地址
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java
index 76d9a47c1..6de2687f3 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java
@@ -75,6 +75,7 @@ public abstract class AbstractProtocol implements Protocol {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
+ @Override
public void destroy() {
for (Invoker> invoker : invokers) {
if (invoker != null) {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java
index 9eb3d8f0d..ffcd2b521 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java
@@ -32,11 +32,18 @@ import java.util.concurrent.CopyOnWriteArrayList;
/**
* AbstractProxyProtocol
+ *
+ * Proxy 协议抽象类
*/
public abstract class AbstractProxyProtocol extends AbstractProtocol {
+ /**
+ * 需要抛出的异常类集合,详见 {@link #refer(Class, URL)} 方法。
+ */
private final List> rpcExceptions = new CopyOnWriteArrayList>();
-
+ /**
+ * ProxyFactory 对象
+ */
private ProxyFactory proxyFactory;
public AbstractProxyProtocol() {
@@ -60,18 +67,27 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
this.proxyFactory = proxyFactory;
}
+ @Override
@SuppressWarnings("unchecked")
public Exporter export(final Invoker invoker) throws RpcException {
+ // 获得服务键
final String uri = serviceKey(invoker.getUrl());
+ // 获得 Exporter 对象。若已经暴露,直接返回。
Exporter exporter = (Exporter) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
+ // 执行暴露服务
final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
+ // 创建 Exporter 对象
exporter = new AbstractExporter(invoker) {
+
+ @Override
public void unexport() {
+ // 取消暴露
super.unexport();
exporterMap.remove(uri);
+ // 执行取消暴露的回调
if (runnable != null) {
try {
runnable.run();
@@ -80,18 +96,26 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
}
}
}
+
};
+ // 添加到 Exporter 集合
exporterMap.put(uri, exporter);
return exporter;
}
+ @Override
public Invoker refer(final Class type, final URL url) throws RpcException {
- final Invoker tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
+ // 执行引用服务
+ final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url);
+ // 创建 Invoker 对象
Invoker invoker = new AbstractInvoker(type, url) {
+
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
- Result result = tagert.invoke(invocation);
+ // 调用
+ Result result = target.invoke(invocation);
+ // 若返回结果带有异常,并且需要抛出,则抛出异常。
Throwable e = result.getException();
if (e != null) {
for (Class> rpcException : rpcExceptions) {
@@ -102,15 +126,19 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
}
return result;
} catch (RpcException e) {
+ // 若是未知异常,获得异常对应的错误码
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
+ // 抛出 RpcException 异常
throw getRpcException(type, url, invocation, e);
}
}
+
};
+ // 添加到 Invoker 集合。
invokers.add(invoker);
return invoker;
}
@@ -130,12 +158,37 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol {
return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
}
+ /**
+ * 获得异常对应的错误码
+ *
+ * @param e 异常
+ * @return 错误码
+ */
protected int getErrorCode(Throwable e) {
return RpcException.UNKNOWN_EXCEPTION;
}
+ /**
+ * 执行暴露,并返回取消暴露的回调 Runnable
+ *
+ * @param impl 服务 Proxy 对象
+ * @param type 服务接口
+ * @param url URL
+ * @param 服务接口
+ * @return 消暴露的回调 Runnable
+ * @throws RpcException 当发生异常
+ */
protected abstract Runnable doExport(T impl, Class type, URL url) throws RpcException;
+ /**
+ * 执行引用,并返回调用远程服务的 Service 对象
+ *
+ * @param type 服务接口
+ * @param url URL
+ * @param 服务接口
+ * @return 调用远程服务的 Service 对象
+ * @throws RpcException 当发生异常
+ */
protected abstract T doRefer(Class type, URL url) throws RpcException;
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java
index f572ad0f1..965c6e6d4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java
@@ -54,7 +54,7 @@ final public class MockInvoker implements Invoker {
}
public static Object parseMockValue(String mock, Type[] returnTypes) throws Exception {
- Object value = null;
+ Object value;
if ("empty".equals(mock)) {
value = ReflectUtils.getEmptyObject(returnTypes != null && returnTypes.length > 0 ? (Class>) returnTypes[0] : null);
} else if ("null".equals(mock)) {
diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java
index 82e1dd70e..499c3e826 100644
--- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java
@@ -42,13 +42,26 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* http rpc support.
+ *
+ * Hessian 协议实现类
*/
public class HessianProtocol extends AbstractProxyProtocol {
+ /**
+ * Http 服务器集合
+ *
+ * key:ip:port
+ */
private final Map serverMap = new ConcurrentHashMap();
-
+ /**
+ * Spring HttpInvokerServiceExporter 集合
+ *
+ * key:path 服务名
+ */
private final Map skeletonMap = new ConcurrentHashMap();
-
+ /**
+ * HttpBinder$Adaptive 对象
+ */
private HttpBinder httpBinder;
public HessianProtocol() {
@@ -63,16 +76,21 @@ public class HessianProtocol extends AbstractProxyProtocol {
return 80;
}
+ @Override
protected Runnable doExport(T impl, Class type, URL url) throws RpcException {
+ // 获得服务器地址
String addr = getAddr(url);
+ // 获得 HttpServer 对象。若不存在,进行创建。
HttpServer server = serverMap.get(addr);
if (server == null) {
- server = httpBinder.bind(url, new HessianHandler());
+ server = httpBinder.bind(url, new HessianHandler()); // HessianHandler
serverMap.put(addr, server);
}
+ // 添加到 skeletonMap 中
final String path = url.getAbsolutePath();
HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);
+ // 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
skeletonMap.remove(path);
@@ -80,21 +98,27 @@ public class HessianProtocol extends AbstractProxyProtocol {
};
}
+ @Override
@SuppressWarnings("unchecked")
protected T doRefer(Class serviceType, URL url) throws RpcException {
+ // 创建 HessianProxyFactory 对象
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
+ // 创建连接器工厂为 HttpClientConnectionFactory 对象,即 Apache HttpClient
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
}
+ // 设置超时时间
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
+ // 创建 Service Proxy 对象
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
+ @Override
protected int getErrorCode(Throwable e) {
if (e instanceof HessianConnectionException) {
if (e.getCause() != null) {
@@ -110,8 +134,11 @@ public class HessianProtocol extends AbstractProxyProtocol {
return super.getErrorCode(e);
}
+ @Override
public void destroy() {
+ // 销毁
super.destroy();
+ // 销毁 HttpServer
for (String key : new ArrayList(serverMap.keySet())) {
HttpServer server = serverMap.remove(key);
if (server != null) {
@@ -129,12 +156,16 @@ public class HessianProtocol extends AbstractProxyProtocol {
private class HessianHandler implements HttpHandler {
+ @Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
+ // 获得 HessianSkeleton 对象
HessianSkeleton skeleton = skeletonMap.get(uri);
+ // 必须是 POST 请求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
+ // 执行调用
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java
index 52ac3f5d9..8d35f31ad 100644
--- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java
+++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java
@@ -34,6 +34,9 @@ import java.net.URL;
*/
public class HttpClientConnection implements HessianConnection {
+ /**
+ * Apache HttpClient
+ */
private final HttpClient httpClient;
private final ByteArrayOutputStream output;
@@ -48,35 +51,43 @@ public class HttpClientConnection implements HessianConnection {
this.request = new HttpPost(url.toString());
}
+ @Override
public void addHeader(String key, String value) {
request.addHeader(new BasicHeader(key, value));
}
+ @Override
public OutputStream getOutputStream() throws IOException {
return output;
}
+ @Override
public void sendRequest() throws IOException {
request.setEntity(new ByteArrayEntity(output.toByteArray()));
this.response = httpClient.execute(request);
}
+ @Override
public int getStatusCode() {
return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode();
}
+ @Override
public String getStatusMessage() {
return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase();
}
+ @Override
public String getContentEncoding() {
return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue();
}
+ @Override
public InputStream getInputStream() throws IOException {
return response == null || response.getEntity() == null ? null : response.getEntity().getContent();
}
+ @Override
public void close() throws IOException {
HttpPost request = this.request;
if (request != null) {
@@ -84,6 +95,7 @@ public class HttpClientConnection implements HessianConnection {
}
}
+ @Override
public void destroy() throws IOException {
}
diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java
index 7a366e589..54e977585 100644
--- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java
+++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java
@@ -23,7 +23,6 @@ import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.params.HttpConnectionParams;
-import java.io.IOException;
import java.net.URL;
/**
@@ -31,15 +30,20 @@ import java.net.URL;
*/
public class HttpClientConnectionFactory implements HessianConnectionFactory {
+ /**
+ * Apache HttpClient
+ */
private final HttpClient httpClient = new DefaultHttpClient();
+ @Override
public void setHessianProxyFactory(HessianProxyFactory factory) {
HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout());
HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout());
}
- public HessianConnection open(URL url) throws IOException {
- return new HttpClientConnection(httpClient, url);
+ @Override
+ public HessianConnection open(URL url) {
+ return new HttpClientConnection(httpClient, url); // HttpClientConnection
}
}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java b/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java
index 5593c7ae8..4e1eb0dd7 100644
--- a/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java
@@ -42,15 +42,30 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* HttpProtocol
+ *
+ * HTTP 协议实现类
*/
public class HttpProtocol extends AbstractProxyProtocol {
+ /**
+ * 默认服务器端口
+ */
public static final int DEFAULT_PORT = 80;
-
+ /**
+ * Http 服务器集合
+ *
+ * key:ip:port
+ */
private final Map serverMap = new ConcurrentHashMap();
-
+ /**
+ * Spring HttpInvokerServiceExporter 集合
+ *
+ * key:path 服务名
+ */
private final Map skeletonMap = new ConcurrentHashMap();
-
+ /**
+ * HttpBinder$Adaptive 对象
+ */
private HttpBinder httpBinder;
public HttpProtocol() {
@@ -65,13 +80,17 @@ public class HttpProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
+ @Override
protected Runnable doExport(final T impl, Class type, URL url) throws RpcException {
+ // 获得服务器地址
String addr = getAddr(url);
+ // 获得 HttpServer 对象。若不存在,进行创建。
HttpServer server = serverMap.get(addr);
if (server == null) {
- server = httpBinder.bind(url, new InternalHandler());
+ server = httpBinder.bind(url, new InternalHandler()); // InternalHandler
serverMap.put(addr, server);
}
+ // 创建 HttpInvokerServiceExporter 对象
final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
httpServiceExporter.setServiceInterface(type);
httpServiceExporter.setService(impl);
@@ -80,8 +99,10 @@ public class HttpProtocol extends AbstractProxyProtocol {
} catch (Exception e) {
throw new RpcException(e.getMessage(), e);
}
+ // 添加到 skeletonMap 中
final String path = url.getAbsolutePath();
skeletonMap.put(path, httpServiceExporter);
+ // 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
skeletonMap.remove(path);
@@ -89,11 +110,14 @@ public class HttpProtocol extends AbstractProxyProtocol {
};
}
+ @Override
@SuppressWarnings("unchecked")
protected T doRefer(final Class serviceType, final URL url) throws RpcException {
+ // 创建 HttpInvokerProxyFactoryBean 对象
final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
httpProxyFactoryBean.setServiceUrl(url.toIdentityString());
httpProxyFactoryBean.setServiceInterface(serviceType);
+ // 创建执行器 SimpleHttpInvokerRequestExecutor 对象
String client = url.getParameter(Constants.CLIENT_KEY);
if (client == null || client.length() == 0 || "simple".equals(client)) {
SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
@@ -105,6 +129,7 @@ public class HttpProtocol extends AbstractProxyProtocol {
}
};
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
+ // 创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象
} else if ("commons".equals(client)) {
HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
@@ -113,9 +138,12 @@ public class HttpProtocol extends AbstractProxyProtocol {
throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
}
httpProxyFactoryBean.afterPropertiesSet();
+ // 返回 HttpInvokerProxyFactoryBean 对象
return (T) httpProxyFactoryBean.getObject();
}
+ @Override
+ @SuppressWarnings("Duplicates")
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
@@ -135,12 +163,15 @@ public class HttpProtocol extends AbstractProxyProtocol {
private class InternalHandler implements HttpHandler {
- public void handle(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
+ @Override
+ public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException {
String uri = request.getRequestURI();
+ // 获得 HttpInvokerServiceExporter 对象
HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
+ // 必须是 POST 请求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
+ // 执行调用
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
diff --git a/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java b/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java
index ff0bc9702..126eb35e8 100644
--- a/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java
@@ -49,12 +49,15 @@ public class MemcachedProtocol extends AbstractProtocol {
return DEFAULT_PORT;
}
+ @Override
public Exporter export(final Invoker invoker) throws RpcException {
throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl());
}
+ @Override
public Invoker refer(final Class type, final URL url) throws RpcException {
try {
+ // 创建 MemcachedClient 对象
String address = url.getAddress();
String backup = url.getParameter(Constants.BACKUP_KEY);
if (backup != null && backup.length() > 0) {
@@ -62,30 +65,38 @@ public class MemcachedProtocol extends AbstractProtocol {
}
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address));
final MemcachedClient memcachedClient = builder.build();
+
+ // 处理方法名的映射
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker(type, url) {
+
+ @Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
+ // Memcached get 指令
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
+ // Memcached set 指令
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
return new RpcResult();
+ // Memcached delele 指令
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
return new RpcResult();
+ // 不支持的指令,抛出异常
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
}
@@ -100,14 +111,18 @@ public class MemcachedProtocol extends AbstractProtocol {
}
}
+ @Override
public void destroy() {
+ // 标记销毁
super.destroy();
+ // 关闭 MemcachedClient
try {
memcachedClient.shutdown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+
};
} catch (Throwable t) {
throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
diff --git a/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java b/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java
index bd0aad38e..794bf5c75 100644
--- a/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java
@@ -56,6 +56,7 @@ public class RedisProtocol extends AbstractProtocol {
return DEFAULT_PORT;
}
+ @Override
public Exporter export(final Invoker invoker) throws RpcException {
throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl());
}
@@ -64,8 +65,10 @@ public class RedisProtocol extends AbstractProtocol {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter(Constants.SERIALIZATION_KEY, "java"));
}
+ @Override
public Invoker refer(final Class type, final URL url) throws RpcException {
try {
+ // 创建 GenericObjectPoolConfig 对象,设置配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
@@ -86,46 +89,64 @@ public class RedisProtocol extends AbstractProtocol {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
+ // 创建 JedisPool 对象
final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
+
+ // 处理方法名的映射
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
+
+ // 创建 Invoker 对象
return new AbstractInvoker(type, url) {
- protected Result doInvoke(Invocation invocation) throws Throwable {
+
+ @Override
+ protected Result doInvoke(Invocation invocation) {
Jedis resource = null;
try {
+ // 获得 Redis Resource
resource = jedisPool.getResource();
-
+ // Redis get 指令
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
+ // 获得值
byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return new RpcResult();
}
+ // 反序列化
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
+ // 返回结果
return new RpcResult(oin.readObject());
+ // Redis set/put 指令
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
+ // 序列化
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
+ // 设置值
resource.set(key, output.toByteArray());
if (expiry > 1000) {
resource.expire(key, expiry / 1000);
}
+ // 返回结果
return new RpcResult();
+ // Redis remote/delete 指令
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
+ // 删除值
resource.del(String.valueOf(invocation.getArguments()[0]).getBytes());
+ // 返回结果
return new RpcResult();
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
@@ -141,6 +162,7 @@ public class RedisProtocol extends AbstractProtocol {
}
throw re;
} finally {
+ // 归还 Redis Resource
if (resource != null) {
try {
jedisPool.returnResource(resource);
@@ -151,14 +173,18 @@ public class RedisProtocol extends AbstractProtocol {
}
}
+ @Override
public void destroy() {
+ // 标记销毁
super.destroy();
+ // 销毁 Redis Pool
try {
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+
};
} catch (Throwable t) {
throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
diff --git a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java
index 20f4e992e..bf3b3d194 100644
--- a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java
@@ -35,9 +35,14 @@ import java.rmi.RemoteException;
/**
* RmiProtocol.
+ *
+ * RMI 协议实现类
*/
public class RmiProtocol extends AbstractProxyProtocol {
+ /**
+ * 默认端口
+ */
public static final int DEFAULT_PORT = 1099;
public RmiProtocol() {
@@ -48,7 +53,9 @@ public class RmiProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
+ @Override
protected Runnable doExport(final T impl, Class type, URL url) throws RpcException {
+ // 创建 RmiServiceExporter 对象
final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();
rmiServiceExporter.setRegistryPort(url.getPort());
rmiServiceExporter.setServiceName(url.getPath());
@@ -59,6 +66,7 @@ public class RmiProtocol extends AbstractProxyProtocol {
} catch (RemoteException e) {
throw new RpcException(e.getMessage(), e);
}
+ // 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
try {
@@ -70,10 +78,13 @@ public class RmiProtocol extends AbstractProxyProtocol {
};
}
+ @Override
@SuppressWarnings("unchecked")
protected T doRefer(final Class serviceType, final URL url) throws RpcException {
+ // 创建 RmiProxyFactoryBean 对象
final RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
// RMI needs extra parameter since it uses customized remote invocation object
+ // RMI传输时使用自定义的远程执行对象,从而传递额外的参数
if (url.getParameter(Constants.DUBBO_VERSION_KEY, Version.getVersion()).equals(Version.getVersion())) {
// Check dubbo version on provider, this feature only support
rmiProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
@@ -82,15 +93,18 @@ public class RmiProtocol extends AbstractProxyProtocol {
}
});
}
+ // 设置相关参数
rmiProxyFactoryBean.setServiceUrl(url.toIdentityString());
rmiProxyFactoryBean.setServiceInterface(serviceType);
rmiProxyFactoryBean.setCacheStub(true);
rmiProxyFactoryBean.setLookupStubOnStartup(true);
rmiProxyFactoryBean.setRefreshStubOnConnectFailure(true);
rmiProxyFactoryBean.afterPropertiesSet();
+ // 创建 Service Proxy 对象
return (T) rmiProxyFactoryBean.getObject();
}
+ @Override
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
diff --git a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java
index 26e0ec8dd..ecee685e9 100644
--- a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java
@@ -25,11 +25,15 @@ import java.util.HashMap;
import java.util.Map;
public class RmiRemoteInvocation extends RemoteInvocation {
+
private static final long serialVersionUID = 1L;
+
private static final String dubboAttachmentsAttrName = "dubbo.attachments";
/**
* executed on consumer side
+ *
+ * 构造将在消费端执行
*/
public RmiRemoteInvocation(MethodInvocation methodInvocation) {
super(methodInvocation);
@@ -40,6 +44,8 @@ public class RmiRemoteInvocation extends RemoteInvocation {
* Need to restore context on provider side (Though context will be overridden by Invocation's attachment
* when ContextFilter gets executed, we will restore the attachment when Invocation is constructed, check more
* from {@link com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler}
+ *
+ * 服务端执行时,重新放入上下文(虽然这时上下文在ContextFilter执行时将被Invocation的attachments覆盖,我们在Invocation构造时还原attachments, see InvokerInvocationHandler)
*/
@SuppressWarnings("unchecked")
@Override
diff --git a/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java b/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java
index 04450cad3..19258b815 100644
--- a/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java
@@ -50,17 +50,33 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* WebServiceProtocol.
+ *
+ * WebService 协议实现类
*/
public class WebServiceProtocol extends AbstractProxyProtocol {
+ /**
+ * 默认服务器端口
+ */
public static final int DEFAULT_PORT = 80;
-
+ /**
+ * Http 服务器集合
+ *
+ * key:ip:port
+ */
private final Map serverMap = new ConcurrentHashMap();
-
+ /**
+ * 《我眼中的CXF之Bus》http://jnn.iteye.com/blog/94746
+ * 《CXF BUS》https://blog.csdn.net/chen_fly2011/article/details/56664908
+ */
private final ExtensionManagerBus bus = new ExtensionManagerBus();
-
+ /**
+ *
+ */
private final HTTPTransportFactory transportFactory = new HTTPTransportFactory();
-
+ /**
+ * HttpBinder$Adaptive 对象
+ */
private HttpBinder httpBinder;
public WebServiceProtocol() {
@@ -76,13 +92,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
return DEFAULT_PORT;
}
+ @Override
protected Runnable doExport(T impl, Class type, URL url) throws RpcException {
+ // 获得服务器地址
String addr = getAddr(url);
+ // 获得 HttpServer 对象。若不存在,进行创建。
HttpServer httpServer = serverMap.get(addr);
if (httpServer == null) {
- httpServer = httpBinder.bind(url, new WebServiceHandler());
+ httpServer = httpBinder.bind(url, new WebServiceHandler()); // WebServiceHandler
serverMap.put(addr, httpServer);
}
+ // 创建 ServerFactoryBean 对象
final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
serverFactoryBean.setAddress(url.getAbsolutePath());
serverFactoryBean.setServiceClass(type);
@@ -90,6 +110,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
serverFactoryBean.setBus(bus);
serverFactoryBean.setDestinationFactory(transportFactory);
serverFactoryBean.create();
+ // 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
serverFactoryBean.destroy();
@@ -97,13 +118,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
};
}
+ @Override
@SuppressWarnings("unchecked")
protected T doRefer(final Class serviceType, final URL url) throws RpcException {
+ // 创建 ClientProxyFactoryBean 对象
ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean();
proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString());
proxyFactoryBean.setServiceClass(serviceType);
proxyFactoryBean.setBus(bus);
+ // 创建 Service Proxy 对象
T ref = (T) proxyFactoryBean.create();
+ // 设置超时相关属性
Client proxy = ClientProxy.getClient(ref);
HTTPConduit conduit = (HTTPConduit) proxy.getConduit();
HTTPClientPolicy policy = new HTTPClientPolicy();
@@ -113,6 +138,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
return ref;
}
+ @Override
protected int getErrorCode(Throwable e) {
if (e instanceof Fault) {
e = e.getCause();
@@ -129,7 +155,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
private volatile ServletController servletController;
+ @Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ // 创建 ServletController 对象,设置使用 DispatcherServlet 。
if (servletController == null) {
HttpServlet httpServlet = DispatcherServlet.getInstance();
if (httpServlet == null) {
@@ -142,7 +170,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol {
}
}
}
+ // 设置调用方地址
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
+ // 执行调用
servletController.invoke(request, response);
}
--
GitLab