diff --git a/dubbo-demo/dubbo-http-demo-consumer/pom.xml b/dubbo-demo/dubbo-http-demo-consumer/pom.xml index 7d30d8cb253cf5d6d3fb38776c8c6033697d91e9..f763e83caeff47cda177174c4dca7ec369790a0e 100644 --- a/dubbo-demo/dubbo-http-demo-consumer/pom.xml +++ b/dubbo-demo/dubbo-http-demo-consumer/pom.xml @@ -52,14 +52,11 @@ limitations under the License. com.alibaba dubbo-cluster - - - - - - - - + + com.alibaba + dubbo-rpc-http + + org.springframework.boot spring-boot-starter-web diff --git a/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java b/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..024383fff688819192885ad600dc1b1351f40432 --- /dev/null +++ b/dubbo-demo/dubbo-http-demo-consumer/src/main/java/com/alibaba/dubbo/demo/provider/HttpConsumer.java @@ -0,0 +1,30 @@ +package com.alibaba.dubbo.demo.provider; + +import com.alibaba.dubbo.demo.HttpDemoService; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class HttpConsumer { + + public static void main(String[] args) { + System.setProperty("java.net.preferIPv4Stack", "true"); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); + context.start(); + HttpDemoService demoService = (HttpDemoService) context.getBean("demoService"); // get remote service proxy + + while (true) { + try { + Thread.sleep(1000); + + String hello = demoService.hello("world"); // call remote method + System.out.println(hello); // get result + + + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + + + } + } + +} diff --git a/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml b/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml similarity index 64% rename from dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml rename to dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml index fb8a97857ccaa7330250bdab38b221095dbaeea1..1fb64a834452292f52199e6eb2613ce54a33fa11 100644 --- a/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-provider.xml +++ b/dubbo-demo/dubbo-http-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml @@ -21,21 +21,17 @@ limitations under the License. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> - - - - - - - - - - - - - - - - + + + + + + + + + + \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java index a458a68e015750d780959949dc87b976b6bf8240..78c6afbbd4b3b2ae3bc258ab3c2905be6a1f0a1f 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java @@ -65,16 +65,27 @@ public class RpcContext { private final Map attachments = new HashMap(); private final Map values = new HashMap(); + /** + * 异步调用 Future + */ private Future future; private List urls; - + /** + * URL 对象 + */ private URL url; - + /** + * 方法名 + */ private String methodName; - + /** + * 参数类型数组 + */ private Class[] parameterTypes; - + /** + * 参数值数组 + */ private Object[] arguments; /** * 服务消费者地址 diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java index 76d9a47c1aaeb8880851af3a933c31a16fb7c4cc..6de2687f32397b11fb8af1587a99ac22612d04cf 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java @@ -75,6 +75,7 @@ public abstract class AbstractProtocol implements Protocol { return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); } + @Override public void destroy() { for (Invoker invoker : invokers) { if (invoker != null) { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java index 9eb3d8f0de41e6e7fd86c56ebf90edba2d9cb44e..ffcd2b52188fad33974364cc93cd4781e1a5ebb0 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java @@ -32,11 +32,18 @@ import java.util.concurrent.CopyOnWriteArrayList; /** * AbstractProxyProtocol + * + * Proxy 协议抽象类 */ public abstract class AbstractProxyProtocol extends AbstractProtocol { + /** + * 需要抛出的异常类集合,详见 {@link #refer(Class, URL)} 方法。 + */ private final List> rpcExceptions = new CopyOnWriteArrayList>(); - + /** + * ProxyFactory 对象 + */ private ProxyFactory proxyFactory; public AbstractProxyProtocol() { @@ -60,18 +67,27 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol { this.proxyFactory = proxyFactory; } + @Override @SuppressWarnings("unchecked") public Exporter export(final Invoker invoker) throws RpcException { + // 获得服务键 final String uri = serviceKey(invoker.getUrl()); + // 获得 Exporter 对象。若已经暴露,直接返回。 Exporter exporter = (Exporter) exporterMap.get(uri); if (exporter != null) { return exporter; } + // 执行暴露服务 final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl()); + // 创建 Exporter 对象 exporter = new AbstractExporter(invoker) { + + @Override public void unexport() { + // 取消暴露 super.unexport(); exporterMap.remove(uri); + // 执行取消暴露的回调 if (runnable != null) { try { runnable.run(); @@ -80,18 +96,26 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol { } } } + }; + // 添加到 Exporter 集合 exporterMap.put(uri, exporter); return exporter; } + @Override public Invoker refer(final Class type, final URL url) throws RpcException { - final Invoker tagert = proxyFactory.getInvoker(doRefer(type, url), type, url); + // 执行引用服务 + final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url); + // 创建 Invoker 对象 Invoker invoker = new AbstractInvoker(type, url) { + @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { - Result result = tagert.invoke(invocation); + // 调用 + Result result = target.invoke(invocation); + // 若返回结果带有异常,并且需要抛出,则抛出异常。 Throwable e = result.getException(); if (e != null) { for (Class rpcException : rpcExceptions) { @@ -102,15 +126,19 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol { } return result; } catch (RpcException e) { + // 若是未知异常,获得异常对应的错误码 if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) { e.setCode(getErrorCode(e.getCause())); } throw e; } catch (Throwable e) { + // 抛出 RpcException 异常 throw getRpcException(type, url, invocation, e); } } + }; + // 添加到 Invoker 集合。 invokers.add(invoker); return invoker; } @@ -130,12 +158,37 @@ public abstract class AbstractProxyProtocol extends AbstractProtocol { return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); } + /** + * 获得异常对应的错误码 + * + * @param e 异常 + * @return 错误码 + */ protected int getErrorCode(Throwable e) { return RpcException.UNKNOWN_EXCEPTION; } + /** + * 执行暴露,并返回取消暴露的回调 Runnable + * + * @param impl 服务 Proxy 对象 + * @param type 服务接口 + * @param url URL + * @param 服务接口 + * @return 消暴露的回调 Runnable + * @throws RpcException 当发生异常 + */ protected abstract Runnable doExport(T impl, Class type, URL url) throws RpcException; + /** + * 执行引用,并返回调用远程服务的 Service 对象 + * + * @param type 服务接口 + * @param url URL + * @param 服务接口 + * @return 调用远程服务的 Service 对象 + * @throws RpcException 当发生异常 + */ protected abstract T doRefer(Class type, URL url) throws RpcException; } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java index f572ad0f165737cbe997dfd0b41e0366a2a4204e..965c6e6d4e790a487bf85fd2cf2b2845f6d81416 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/support/MockInvoker.java @@ -54,7 +54,7 @@ final public class MockInvoker implements Invoker { } public static Object parseMockValue(String mock, Type[] returnTypes) throws Exception { - Object value = null; + Object value; if ("empty".equals(mock)) { value = ReflectUtils.getEmptyObject(returnTypes != null && returnTypes.length > 0 ? (Class) returnTypes[0] : null); } else if ("null".equals(mock)) { diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java index 82e1dd70ed422473dbdfab94b6f21a1e15eba8f6..499c3e826c9e1fad529388b4b98c3af68ffbdfcb 100644 --- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java +++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java @@ -42,13 +42,26 @@ import java.util.concurrent.ConcurrentHashMap; /** * http rpc support. + * + * Hessian 协议实现类 */ public class HessianProtocol extends AbstractProxyProtocol { + /** + * Http 服务器集合 + * + * key:ip:port + */ private final Map serverMap = new ConcurrentHashMap(); - + /** + * Spring HttpInvokerServiceExporter 集合 + * + * key:path 服务名 + */ private final Map skeletonMap = new ConcurrentHashMap(); - + /** + * HttpBinder$Adaptive 对象 + */ private HttpBinder httpBinder; public HessianProtocol() { @@ -63,16 +76,21 @@ public class HessianProtocol extends AbstractProxyProtocol { return 80; } + @Override protected Runnable doExport(T impl, Class type, URL url) throws RpcException { + // 获得服务器地址 String addr = getAddr(url); + // 获得 HttpServer 对象。若不存在,进行创建。 HttpServer server = serverMap.get(addr); if (server == null) { - server = httpBinder.bind(url, new HessianHandler()); + server = httpBinder.bind(url, new HessianHandler()); // HessianHandler serverMap.put(addr, server); } + // 添加到 skeletonMap 中 final String path = url.getAbsolutePath(); HessianSkeleton skeleton = new HessianSkeleton(impl, type); skeletonMap.put(path, skeleton); + // 返回取消暴露的回调 Runnable return new Runnable() { public void run() { skeletonMap.remove(path); @@ -80,21 +98,27 @@ public class HessianProtocol extends AbstractProxyProtocol { }; } + @Override @SuppressWarnings("unchecked") protected T doRefer(Class serviceType, URL url) throws RpcException { + // 创建 HessianProxyFactory 对象 HessianProxyFactory hessianProxyFactory = new HessianProxyFactory(); + // 创建连接器工厂为 HttpClientConnectionFactory 对象,即 Apache HttpClient String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT); if ("httpclient".equals(client)) { hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory()); } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) { throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!"); } + // 设置超时时间 int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); hessianProxyFactory.setConnectTimeout(timeout); hessianProxyFactory.setReadTimeout(timeout); + // 创建 Service Proxy 对象 return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader()); } + @Override protected int getErrorCode(Throwable e) { if (e instanceof HessianConnectionException) { if (e.getCause() != null) { @@ -110,8 +134,11 @@ public class HessianProtocol extends AbstractProxyProtocol { return super.getErrorCode(e); } + @Override public void destroy() { + // 销毁 super.destroy(); + // 销毁 HttpServer for (String key : new ArrayList(serverMap.keySet())) { HttpServer server = serverMap.remove(key); if (server != null) { @@ -129,12 +156,16 @@ public class HessianProtocol extends AbstractProxyProtocol { private class HessianHandler implements HttpHandler { + @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { String uri = request.getRequestURI(); + // 获得 HessianSkeleton 对象 HessianSkeleton skeleton = skeletonMap.get(uri); + // 必须是 POST 请求 if (!request.getMethod().equalsIgnoreCase("POST")) { response.setStatus(500); + // 执行调用 } else { RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); try { diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java index 52ac3f5d9bf96624ac88f8ee408bf15e095ed487..8d35f31ad994c71cafbae616472314ec5f5bdd3c 100644 --- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java +++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnection.java @@ -34,6 +34,9 @@ import java.net.URL; */ public class HttpClientConnection implements HessianConnection { + /** + * Apache HttpClient + */ private final HttpClient httpClient; private final ByteArrayOutputStream output; @@ -48,35 +51,43 @@ public class HttpClientConnection implements HessianConnection { this.request = new HttpPost(url.toString()); } + @Override public void addHeader(String key, String value) { request.addHeader(new BasicHeader(key, value)); } + @Override public OutputStream getOutputStream() throws IOException { return output; } + @Override public void sendRequest() throws IOException { request.setEntity(new ByteArrayEntity(output.toByteArray())); this.response = httpClient.execute(request); } + @Override public int getStatusCode() { return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode(); } + @Override public String getStatusMessage() { return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase(); } + @Override public String getContentEncoding() { return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue(); } + @Override public InputStream getInputStream() throws IOException { return response == null || response.getEntity() == null ? null : response.getEntity().getContent(); } + @Override public void close() throws IOException { HttpPost request = this.request; if (request != null) { @@ -84,6 +95,7 @@ public class HttpClientConnection implements HessianConnection { } } + @Override public void destroy() throws IOException { } diff --git a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java index 7a366e589fdac171da6178d89342150c387e61bb..54e977585ec33432c1595cd2f7b55c677e5c21b7 100644 --- a/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java +++ b/dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HttpClientConnectionFactory.java @@ -23,7 +23,6 @@ import org.apache.http.client.HttpClient; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.params.HttpConnectionParams; -import java.io.IOException; import java.net.URL; /** @@ -31,15 +30,20 @@ import java.net.URL; */ public class HttpClientConnectionFactory implements HessianConnectionFactory { + /** + * Apache HttpClient + */ private final HttpClient httpClient = new DefaultHttpClient(); + @Override public void setHessianProxyFactory(HessianProxyFactory factory) { HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout()); HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout()); } - public HessianConnection open(URL url) throws IOException { - return new HttpClientConnection(httpClient, url); + @Override + public HessianConnection open(URL url) { + return new HttpClientConnection(httpClient, url); // HttpClientConnection } } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java b/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java index 5593c7ae83d3d7b62b5c83a6c10dc8eddca209db..4e1eb0dd7fb0b20ec3f5ac5e334052296ab8aa43 100644 --- a/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java +++ b/dubbo-rpc/dubbo-rpc-http/src/main/java/com/alibaba/dubbo/rpc/protocol/http/HttpProtocol.java @@ -42,15 +42,30 @@ import java.util.concurrent.ConcurrentHashMap; /** * HttpProtocol + * + * HTTP 协议实现类 */ public class HttpProtocol extends AbstractProxyProtocol { + /** + * 默认服务器端口 + */ public static final int DEFAULT_PORT = 80; - + /** + * Http 服务器集合 + * + * key:ip:port + */ private final Map serverMap = new ConcurrentHashMap(); - + /** + * Spring HttpInvokerServiceExporter 集合 + * + * key:path 服务名 + */ private final Map skeletonMap = new ConcurrentHashMap(); - + /** + * HttpBinder$Adaptive 对象 + */ private HttpBinder httpBinder; public HttpProtocol() { @@ -65,13 +80,17 @@ public class HttpProtocol extends AbstractProxyProtocol { return DEFAULT_PORT; } + @Override protected Runnable doExport(final T impl, Class type, URL url) throws RpcException { + // 获得服务器地址 String addr = getAddr(url); + // 获得 HttpServer 对象。若不存在,进行创建。 HttpServer server = serverMap.get(addr); if (server == null) { - server = httpBinder.bind(url, new InternalHandler()); + server = httpBinder.bind(url, new InternalHandler()); // InternalHandler serverMap.put(addr, server); } + // 创建 HttpInvokerServiceExporter 对象 final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter(); httpServiceExporter.setServiceInterface(type); httpServiceExporter.setService(impl); @@ -80,8 +99,10 @@ public class HttpProtocol extends AbstractProxyProtocol { } catch (Exception e) { throw new RpcException(e.getMessage(), e); } + // 添加到 skeletonMap 中 final String path = url.getAbsolutePath(); skeletonMap.put(path, httpServiceExporter); + // 返回取消暴露的回调 Runnable return new Runnable() { public void run() { skeletonMap.remove(path); @@ -89,11 +110,14 @@ public class HttpProtocol extends AbstractProxyProtocol { }; } + @Override @SuppressWarnings("unchecked") protected T doRefer(final Class serviceType, final URL url) throws RpcException { + // 创建 HttpInvokerProxyFactoryBean 对象 final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean(); httpProxyFactoryBean.setServiceUrl(url.toIdentityString()); httpProxyFactoryBean.setServiceInterface(serviceType); + // 创建执行器 SimpleHttpInvokerRequestExecutor 对象 String client = url.getParameter(Constants.CLIENT_KEY); if (client == null || client.length() == 0 || "simple".equals(client)) { SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() { @@ -105,6 +129,7 @@ public class HttpProtocol extends AbstractProxyProtocol { } }; httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); + // 创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象 } else if ("commons".equals(client)) { HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor(); httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); @@ -113,9 +138,12 @@ public class HttpProtocol extends AbstractProxyProtocol { throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons"); } httpProxyFactoryBean.afterPropertiesSet(); + // 返回 HttpInvokerProxyFactoryBean 对象 return (T) httpProxyFactoryBean.getObject(); } + @Override + @SuppressWarnings("Duplicates") protected int getErrorCode(Throwable e) { if (e instanceof RemoteAccessException) { e = e.getCause(); @@ -135,12 +163,15 @@ public class HttpProtocol extends AbstractProxyProtocol { private class InternalHandler implements HttpHandler { - public void handle(HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { + @Override + public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException { String uri = request.getRequestURI(); + // 获得 HttpInvokerServiceExporter 对象 HttpInvokerServiceExporter skeleton = skeletonMap.get(uri); + // 必须是 POST 请求 if (!request.getMethod().equalsIgnoreCase("POST")) { response.setStatus(500); + // 执行调用 } else { RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); try { diff --git a/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java b/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java index ff0bc9702d885ac54374f9d58a0d047a31c72f8a..126eb35e8de42f30bdbc1c73a0877a388b625297 100644 --- a/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java +++ b/dubbo-rpc/dubbo-rpc-memcached/src/main/java/com/alibaba/dubbo/rpc/protocol/memcached/MemcachedProtocol.java @@ -49,12 +49,15 @@ public class MemcachedProtocol extends AbstractProtocol { return DEFAULT_PORT; } + @Override public Exporter export(final Invoker invoker) throws RpcException { throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl()); } + @Override public Invoker refer(final Class type, final URL url) throws RpcException { try { + // 创建 MemcachedClient 对象 String address = url.getAddress(); String backup = url.getParameter(Constants.BACKUP_KEY); if (backup != null && backup.length() > 0) { @@ -62,30 +65,38 @@ public class MemcachedProtocol extends AbstractProtocol { } MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address)); final MemcachedClient memcachedClient = builder.build(); + + // 处理方法名的映射 final int expiry = url.getParameter("expiry", 0); final String get = url.getParameter("get", "get"); final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker(type, url) { + + @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { + // Memcached get 指令 if (get.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0]))); + // Memcached set 指令 } else if (set.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]); return new RpcResult(); + // Memcached delele 指令 } else if (delete.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } memcachedClient.delete(String.valueOf(invocation.getArguments()[0])); return new RpcResult(); + // 不支持的指令,抛出异常 } else { throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service."); } @@ -100,14 +111,18 @@ public class MemcachedProtocol extends AbstractProtocol { } } + @Override public void destroy() { + // 标记销毁 super.destroy(); + // 关闭 MemcachedClient try { memcachedClient.shutdown(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } + }; } catch (Throwable t) { throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); diff --git a/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java b/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java index bd0aad38eefdd58a57a8d9eb820610784dd28b7a..794bf5c75957c2e97a9216f71917a26fa48765a7 100644 --- a/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java +++ b/dubbo-rpc/dubbo-rpc-redis/src/main/java/com/alibaba/dubbo/rpc/protocol/redis/RedisProtocol.java @@ -56,6 +56,7 @@ public class RedisProtocol extends AbstractProtocol { return DEFAULT_PORT; } + @Override public Exporter export(final Invoker invoker) throws RpcException { throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl()); } @@ -64,8 +65,10 @@ public class RedisProtocol extends AbstractProtocol { return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter(Constants.SERIALIZATION_KEY, "java")); } + @Override public Invoker refer(final Class type, final URL url) throws RpcException { try { + // 创建 GenericObjectPoolConfig 对象,设置配置 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); config.setTestOnReturn(url.getParameter("test.on.return", false)); @@ -86,46 +89,64 @@ public class RedisProtocol extends AbstractProtocol { config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); + // 创建 JedisPool 对象 final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT), url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); + + // 处理方法名的映射 final int expiry = url.getParameter("expiry", 0); final String get = url.getParameter("get", "get"); final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); + + // 创建 Invoker 对象 return new AbstractInvoker(type, url) { - protected Result doInvoke(Invocation invocation) throws Throwable { + + @Override + protected Result doInvoke(Invocation invocation) { Jedis resource = null; try { + // 获得 Redis Resource resource = jedisPool.getResource(); - + // Redis get 指令 if (get.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } + // 获得值 byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } + // 反序列化 ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); + // 返回结果 return new RpcResult(oin.readObject()); + // Redis set/put 指令 } else if (set.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } + // 序列化 byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes(); ByteArrayOutputStream output = new ByteArrayOutputStream(); ObjectOutput value = getSerialization(url).serialize(url, output); value.writeObject(invocation.getArguments()[1]); + // 设置值 resource.set(key, output.toByteArray()); if (expiry > 1000) { resource.expire(key, expiry / 1000); } + // 返回结果 return new RpcResult(); + // Redis remote/delete 指令 } else if (delete.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } + // 删除值 resource.del(String.valueOf(invocation.getArguments()[0]).getBytes()); + // 返回结果 return new RpcResult(); } else { throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service."); @@ -141,6 +162,7 @@ public class RedisProtocol extends AbstractProtocol { } throw re; } finally { + // 归还 Redis Resource if (resource != null) { try { jedisPool.returnResource(resource); @@ -151,14 +173,18 @@ public class RedisProtocol extends AbstractProtocol { } } + @Override public void destroy() { + // 标记销毁 super.destroy(); + // 销毁 Redis Pool try { jedisPool.destroy(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } + }; } catch (Throwable t) { throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); diff --git a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java index 20f4e992ec855b912121f65f67d7bd3c80af5116..bf3b3d1940bf0d6dd70c38d272149a29129d0723 100644 --- a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiProtocol.java @@ -35,9 +35,14 @@ import java.rmi.RemoteException; /** * RmiProtocol. + * + * RMI 协议实现类 */ public class RmiProtocol extends AbstractProxyProtocol { + /** + * 默认端口 + */ public static final int DEFAULT_PORT = 1099; public RmiProtocol() { @@ -48,7 +53,9 @@ public class RmiProtocol extends AbstractProxyProtocol { return DEFAULT_PORT; } + @Override protected Runnable doExport(final T impl, Class type, URL url) throws RpcException { + // 创建 RmiServiceExporter 对象 final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter(); rmiServiceExporter.setRegistryPort(url.getPort()); rmiServiceExporter.setServiceName(url.getPath()); @@ -59,6 +66,7 @@ public class RmiProtocol extends AbstractProxyProtocol { } catch (RemoteException e) { throw new RpcException(e.getMessage(), e); } + // 返回取消暴露的回调 Runnable return new Runnable() { public void run() { try { @@ -70,10 +78,13 @@ public class RmiProtocol extends AbstractProxyProtocol { }; } + @Override @SuppressWarnings("unchecked") protected T doRefer(final Class serviceType, final URL url) throws RpcException { + // 创建 RmiProxyFactoryBean 对象 final RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean(); // RMI needs extra parameter since it uses customized remote invocation object + // RMI传输时使用自定义的远程执行对象,从而传递额外的参数 if (url.getParameter(Constants.DUBBO_VERSION_KEY, Version.getVersion()).equals(Version.getVersion())) { // Check dubbo version on provider, this feature only support rmiProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() { @@ -82,15 +93,18 @@ public class RmiProtocol extends AbstractProxyProtocol { } }); } + // 设置相关参数 rmiProxyFactoryBean.setServiceUrl(url.toIdentityString()); rmiProxyFactoryBean.setServiceInterface(serviceType); rmiProxyFactoryBean.setCacheStub(true); rmiProxyFactoryBean.setLookupStubOnStartup(true); rmiProxyFactoryBean.setRefreshStubOnConnectFailure(true); rmiProxyFactoryBean.afterPropertiesSet(); + // 创建 Service Proxy 对象 return (T) rmiProxyFactoryBean.getObject(); } + @Override protected int getErrorCode(Throwable e) { if (e instanceof RemoteAccessException) { e = e.getCause(); diff --git a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java index 26e0ec8ddb8cd3bead1143322d822715b7d0a790..ecee685e97f54d5f2c084f6e97d62356f8b5f255 100644 --- a/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java +++ b/dubbo-rpc/dubbo-rpc-rmi/src/main/java/com/alibaba/dubbo/rpc/protocol/rmi/RmiRemoteInvocation.java @@ -25,11 +25,15 @@ import java.util.HashMap; import java.util.Map; public class RmiRemoteInvocation extends RemoteInvocation { + private static final long serialVersionUID = 1L; + private static final String dubboAttachmentsAttrName = "dubbo.attachments"; /** * executed on consumer side + * + * 构造将在消费端执行 */ public RmiRemoteInvocation(MethodInvocation methodInvocation) { super(methodInvocation); @@ -40,6 +44,8 @@ public class RmiRemoteInvocation extends RemoteInvocation { * Need to restore context on provider side (Though context will be overridden by Invocation's attachment * when ContextFilter gets executed, we will restore the attachment when Invocation is constructed, check more * from {@link com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler} + * + * 服务端执行时,重新放入上下文(虽然这时上下文在ContextFilter执行时将被Invocation的attachments覆盖,我们在Invocation构造时还原attachments, see InvokerInvocationHandler) */ @SuppressWarnings("unchecked") @Override diff --git a/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java b/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java index 04450cad35cd0d35131861a2bad14b90030f5e89..19258b8159e2ef99aeb859d70776abf1fa7d8572 100644 --- a/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java +++ b/dubbo-rpc/dubbo-rpc-webservice/src/main/java/com/alibaba/dubbo/rpc/protocol/webservice/WebServiceProtocol.java @@ -50,17 +50,33 @@ import java.util.concurrent.ConcurrentHashMap; /** * WebServiceProtocol. + * + * WebService 协议实现类 */ public class WebServiceProtocol extends AbstractProxyProtocol { + /** + * 默认服务器端口 + */ public static final int DEFAULT_PORT = 80; - + /** + * Http 服务器集合 + * + * key:ip:port + */ private final Map serverMap = new ConcurrentHashMap(); - + /** + * 《我眼中的CXF之Bus》http://jnn.iteye.com/blog/94746 + * 《CXF BUS》https://blog.csdn.net/chen_fly2011/article/details/56664908 + */ private final ExtensionManagerBus bus = new ExtensionManagerBus(); - + /** + * + */ private final HTTPTransportFactory transportFactory = new HTTPTransportFactory(); - + /** + * HttpBinder$Adaptive 对象 + */ private HttpBinder httpBinder; public WebServiceProtocol() { @@ -76,13 +92,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol { return DEFAULT_PORT; } + @Override protected Runnable doExport(T impl, Class type, URL url) throws RpcException { + // 获得服务器地址 String addr = getAddr(url); + // 获得 HttpServer 对象。若不存在,进行创建。 HttpServer httpServer = serverMap.get(addr); if (httpServer == null) { - httpServer = httpBinder.bind(url, new WebServiceHandler()); + httpServer = httpBinder.bind(url, new WebServiceHandler()); // WebServiceHandler serverMap.put(addr, httpServer); } + // 创建 ServerFactoryBean 对象 final ServerFactoryBean serverFactoryBean = new ServerFactoryBean(); serverFactoryBean.setAddress(url.getAbsolutePath()); serverFactoryBean.setServiceClass(type); @@ -90,6 +110,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol { serverFactoryBean.setBus(bus); serverFactoryBean.setDestinationFactory(transportFactory); serverFactoryBean.create(); + // 返回取消暴露的回调 Runnable return new Runnable() { public void run() { serverFactoryBean.destroy(); @@ -97,13 +118,17 @@ public class WebServiceProtocol extends AbstractProxyProtocol { }; } + @Override @SuppressWarnings("unchecked") protected T doRefer(final Class serviceType, final URL url) throws RpcException { + // 创建 ClientProxyFactoryBean 对象 ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean(); proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString()); proxyFactoryBean.setServiceClass(serviceType); proxyFactoryBean.setBus(bus); + // 创建 Service Proxy 对象 T ref = (T) proxyFactoryBean.create(); + // 设置超时相关属性 Client proxy = ClientProxy.getClient(ref); HTTPConduit conduit = (HTTPConduit) proxy.getConduit(); HTTPClientPolicy policy = new HTTPClientPolicy(); @@ -113,6 +138,7 @@ public class WebServiceProtocol extends AbstractProxyProtocol { return ref; } + @Override protected int getErrorCode(Throwable e) { if (e instanceof Fault) { e = e.getCause(); @@ -129,7 +155,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol { private volatile ServletController servletController; + @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + // 创建 ServletController 对象,设置使用 DispatcherServlet 。 if (servletController == null) { HttpServlet httpServlet = DispatcherServlet.getInstance(); if (httpServlet == null) { @@ -142,7 +170,9 @@ public class WebServiceProtocol extends AbstractProxyProtocol { } } } + // 设置调用方地址 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); + // 执行调用 servletController.invoke(request, response); }