提交 1a71da59 编写于 作者: W william.liangf

增加hessian

git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@80 1a56cb94-b969-4eaa-88fa-be21384802f2
上级 d5e9a46c
......@@ -37,8 +37,8 @@ import javassist.NotFoundException;
* @author qian.lei
*/
public final class ReflectUtils
{
public final class ReflectUtils {
/**
* void(V).
*/
......@@ -754,6 +754,20 @@ public final class ReflectUtils
}
return targetConstructor;
}
public static boolean isInstance(Object obj, String interfaceClazzName) {
for (Class<?> clazz = obj.getClass();
clazz != null && !clazz.equals(Object.class);
clazz = clazz.getSuperclass()) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> itf : interfaces) {
if (itf.getName().equals(interfaceClazzName)) {
return true;
}
}
}
return false;
}
private ReflectUtils(){}
}
\ No newline at end of file
......@@ -17,6 +17,7 @@ package com.alibaba.dubbo.registry.multicast;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
......@@ -26,6 +27,7 @@ import java.util.List;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.AbstractRegistry;
......@@ -52,6 +54,10 @@ public class MulticastRegistry extends AbstractRegistry {
private MulticastSocket mutilcastSocket;
private InetSocketAddress datagramAddress;
private DatagramSocket datagramSocket;
public MulticastRegistry(URL url) {
super(url);
if (! isMulticastAddress(url.getHost())) {
......@@ -69,7 +75,11 @@ public class MulticastRegistry extends AbstractRegistry {
while (true) {
try {
mutilcastSocket.receive(recv);
MulticastRegistry.this.receive(new String(recv.getData()).trim(), (InetSocketAddress) recv.getSocketAddress());
String msg = new String(recv.getData()).trim();
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + recv.getSocketAddress());
}
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
......@@ -81,6 +91,41 @@ public class MulticastRegistry extends AbstractRegistry {
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
int port = 0;
String udp = url.getParameter("udp");
if (udp == null || udp.length() == 0 || "true".equals(udp)) {
port = NetUtils.getAvailablePort();
} else if (! "false".equals(udp)) {
port = Integer.parseInt(udp);
}
if (port > 0) {
try {
datagramAddress = new InetSocketAddress(NetUtils.getLocalHost(), url.getIntParameter("udp", NetUtils.getAvailablePort()));
datagramSocket = new DatagramSocket(datagramAddress);
Thread thread = new Thread(new Runnable() {
public void run() {
byte[] buf = new byte[1024];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (true) {
try {
datagramSocket.receive(recv);
String msg = new String(recv.getData()).trim();
if (logger.isInfoEnabled()) {
logger.info("Receive udp message: " + msg + " from " + recv.getSocketAddress());
}
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}, "MulticastRegistryUDP");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
private static boolean isMulticastAddress(String ip) {
......@@ -123,10 +168,15 @@ public class MulticastRegistry extends AbstractRegistry {
notify(service, urls);
}
} else if (msg.startsWith(SUBSCRIBE)) {
String service = URL.valueOf(msg.substring(SUBSCRIBE.length()).trim()).getServiceKey();
URL url = URL.valueOf(msg.substring(SUBSCRIBE.length()).trim());
String service = url.getServiceKey();
if (getRegistered().containsKey(service)) {
for (URL url : getRegistered().get(service)) {
send(REGISTER + " " + url.toFullString());
for (URL u : getRegistered().get(service)) {
if (datagramSocket != null && "udp".equals(url.getProtocol())) {
sendTo(REGISTER + " " + u.toFullString(), url);
} else {
send(REGISTER + " " + u.toFullString());
}
}
}
} else if (msg.startsWith(UNSUBSCRIBE)) {
......@@ -134,10 +184,25 @@ public class MulticastRegistry extends AbstractRegistry {
}
private void send(String msg) {
DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), mutilcastAddress, mutilcastSocket.getLocalPort());
if (logger.isInfoEnabled()) {
logger.info("Send multicast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastSocket.getLocalPort());
}
try {
DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), mutilcastAddress, mutilcastSocket.getLocalPort());
mutilcastSocket.send(hi);
} catch (IOException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private void sendTo(String msg, URL url) {
if (logger.isInfoEnabled()) {
logger.info("Send udp message: " + msg + " to " + url.getAddress());
}
try {
DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), InetAddress.getByName(url.getHost()), url.getPort());
datagramSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
......@@ -153,6 +218,11 @@ public class MulticastRegistry extends AbstractRegistry {
public void subscribe(String service, URL url, NotifyListener listener) {
super.subscribe(service, url, listener);
if (datagramAddress != null) {
url = url.setProtocol("udp").setHost(datagramAddress.getAddress().getHostAddress()).setPort(datagramAddress.getPort());
} else {
url = url.setProtocol("multicast").setHost(mutilcastAddress.getHostAddress()).setPort(mutilcastSocket.getLocalPort());
}
send(SUBSCRIBE + " " + url.toFullString());
synchronized (this) {
try {
......@@ -163,6 +233,11 @@ public class MulticastRegistry extends AbstractRegistry {
}
public void unsubscribe(String service, URL url, NotifyListener listener) {
if (datagramAddress != null) {
url = url.setProtocol("udp").setHost(datagramAddress.getAddress().getHostAddress()).setPort(datagramAddress.getPort());
} else {
url = url.setProtocol("multicast").setHost(mutilcastAddress.getHostAddress()).setPort(mutilcastSocket.getLocalPort());
}
send(UNSUBSCRIBE + " " + url.toFullString());
}
......
<!--
- Copyright 1999-2011 Alibaba Group.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-parent</artifactId>
<version>2.0.8-SNAPSHOT</version>
</parent>
<artifactId>dubbo-remoting-http</artifactId>
<packaging>jar</packaging>
<name>Dubbo Http Remoting Module</name>
<description>The http remoting module of dubbo project</description>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.alibaba.dubbo.remoting.http;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* http invocation handler.
*
* @author william.liangf
*/
public interface HttpProcessor {
/**
* invoke.
*
* @param request request.
* @param response response.
* @throws IOException
* @throws ServletException
*/
public abstract void invoke(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException;
}
package com.alibaba.dubbo.remoting.http;
public interface HttpServer {
void start();
void stop();
int getPort();
}
package com.alibaba.dubbo.remoting.http;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.ServletHandler;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
public class JettyHttpServer implements HttpServer {
private static final Logger logger = LoggerFactory.getLogger(JettyHttpServer.class);
private String host;
private int port;
private int threads;
private Server server;
public JettyHttpServer(int port, int threads) {
this.port = port;
this.threads = threads;
}
public JettyHttpServer(String host, int port, int threads) {
this.host = host;
this.port = port;
this.threads = threads;
}
public void start() {
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setDaemon(true);
threadPool.setMaxThreads(threads);
threadPool.setMinThreads(threads);
SelectChannelConnector connector = new SelectChannelConnector();
if (NetUtils.isValidLocalHost(host)) {
connector.setHost(host);
}
connector.setPort(port);
ServletHandler handler = new ServletHandler();
ServletHolder holder = handler.addServletWithMapping(ServiceDispatcherServlet.class, "/*");
holder.setInitOrder(1);
server = new Server();
server.setThreadPool(threadPool);
server.addConnector(connector);
server.addHandler(handler);
try {
server.start();
} catch (Exception e) {
throw new IllegalStateException("Failed to start jetty server on " + host + ":" + port + ", cause: " + e.getMessage(), e);
}
}
public void stop() {
if (server != null) {
try {
server.stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
public int getPort() {
return port;
}
}
package com.alibaba.dubbo.remoting.http;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Service dispatcher Servlet.
*
* @author qian.lei
*/
public class ServiceDispatcherServlet extends HttpServlet {
private static final long serialVersionUID = 5766349180380479888L;
private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded";
private static final Map<String, HttpProcessor> processors = new ConcurrentHashMap<String, HttpProcessor>();
public static void addProcessor(int port, String uri, HttpProcessor processor) {
processors.put(key(port, uri), processor);
}
public static void removeProcessor(int port, String uri) {
processors.remove(key(port, uri));
}
private static String key(int port, String uri) {
return port + ":" + (uri.startsWith("/") ? uri : "/" + uri);
}
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
String uri = request.getRequestURI();
String contentType = request.getContentType();
if (contentType == null || FORM_CONTENT_TYPE.equalsIgnoreCase(contentType)) {
int i = uri.lastIndexOf('/');
if (i >= 0) {
uri = uri.substring(0, i);
}
}
HttpProcessor processor = processors.get(key(request.getLocalPort(), uri));
if( processor == null ) {// service not found.
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
} else {
processor.invoke(request, response);
}
}
}
package com.alibaba.dubbo.remoting.http;
public class ServletHttpServer implements HttpServer {
private final int port;
public ServletHttpServer(int port){
this.port = port;
}
public void start() {
}
public void stop() {
}
public int getPort() {
return port;
}
}
......@@ -52,7 +52,7 @@ public class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
}
@Override
protected Object doInvoke(Invocation invocation) throws Throwable {
protected Result doInvoke(Invocation invocation) throws Throwable {
RpcInvocation inv = new RpcInvocation(invocation.getMethodName(),
invocation.getParameterTypes(), invocation.getArguments(),
invocation.getAttachments());
......@@ -84,7 +84,7 @@ public class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
} catch (Throwable e) { // here is non-biz exception, wrap it.
throw new RpcException(e.getMessage(), e);
}
return result.recreate();
return result;
}
public static class ChannelWrapper extends ClientDelegate {
......
......@@ -52,7 +52,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
}
@Override
protected Object doInvoke(final Invocation invocation) throws Throwable {
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = null;
final String methodName ;
if(Constants.$INVOKE.equals(invocation.getMethodName()) && invocation.getArguments() !=null && invocation.getArguments().length >0 && invocation.getArguments()[0] != null){
......@@ -73,7 +73,6 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
Result result = null ;
try {
// 不可靠异步
boolean isAsync = getUrl().getMethodBooleanParameter(methodName, Constants.ASYNC_KEY);
......@@ -91,18 +90,12 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
return null;
}
RpcContext.getContext().setFuture(null);
result = (Result) currentClient.request(inv, timeout).get();
} catch (RpcException e) {
throw e;
return (Result) currentClient.request(inv, timeout).get();
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e);
} catch (Throwable e) { // here is non-biz exception, wrap it.
throw new RpcException(e.getMessage(), e);
}
//attention: recreate can not in try-catch block.
return result == null ? null: result.recreate();
}
@Override
......
......@@ -45,6 +45,7 @@ import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcConstants;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.protocol.AbstractProtocol;
......@@ -123,8 +124,8 @@ public class DubboProtocol extends AbstractProtocol {
return null;
}
}
return exporter.invoke(inv, channel.getRemoteAddress());
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return exporter.getInvoker().invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
......
......@@ -31,6 +31,11 @@
<artifactId>dubbo-rpc</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting-http</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
......
package com.alibaba.dubbo.rpc.protocol.hessian;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.Extension;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.http.HttpServer;
import com.alibaba.dubbo.remoting.http.JettyHttpServer;
import com.alibaba.dubbo.remoting.http.ServiceDispatcherServlet;
import com.alibaba.dubbo.remoting.http.ServletHttpServer;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProtocol;
/**
* http rpc support.
*
* @author qianlei
*/
@Extension("hessian")
public class HessianProtocol extends AbstractProtocol {
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
private ProxyFactory proxyFactory;
public void setProxyFactory(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
public int getDefaultPort() {
return 80;
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
final String uri = url.getPath(); // service uri also exporter cache key.
int threads = url.getIntParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
String addr = url.getHost() + ":" + url.getPort();
HttpServer server = serverMap.get(addr);
if (server == null) {
String type = url.getParameter(Constants.SERVER_KEY, "jetty");
if ("servlet".equals(type)) {
server = new ServletHttpServer(url.getPort());
} else if ("jetty".equals(type)) {
// 和Dubbo协议一样,总是绑定到0.0.0.0上
server = new JettyHttpServer(url.getPort(), threads);
} else {
throw new IllegalArgumentException("Unsupported http server " + type
+ ", only support servlet, jetty!");
}
server.start();
serverMap.put(addr, server);
}
HessianRpcExporter<T> exporter = new HessianRpcExporter<T>(invoker) {
public void unexport() {
super.unexport();
exporterMap.remove(uri);
}
};
exporterMap.put(uri, exporter);
ServiceDispatcherServlet.addProcessor(url.getPort(), uri, exporter);
return exporter;
}
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
Invoker<T> invoker = new HessianRpcInvoker<T>(serviceType, url, proxyFactory);
invokers.add(invoker);
return invoker;
}
public void destroy() {
super.destroy();
for (String key : new ArrayList<String>(serverMap.keySet())) {
HttpServer server = serverMap.remove(key);
if (server != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close hessian server 0.0.0.0:" + server.getPort());
}
server.stop();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
}
package com.alibaba.dubbo.rpc.protocol.hessian;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.alibaba.dubbo.remoting.http.HttpProcessor;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.AbstractExporter;
/**
* hessian rpc exporter.
*
* @author qian.lei
*/
public class HessianRpcExporter<T> extends AbstractExporter<T> implements HttpProcessor
{
private HessianSkeletonInvoker mSkeleton;
public HessianRpcExporter(Invoker<T> invoker)
{
super(invoker);
mSkeleton = new HessianSkeletonInvoker(invoker.getInterface(), this);
}
public void invoke(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if( request.getMethod().equalsIgnoreCase("POST") == false )
{
response.setStatus(500);
}
else
{
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try
{
mSkeleton.invoke(request.getInputStream(), response.getOutputStream());
}
catch(Throwable e)
{
throw new ServletException(e);
}
}
}
}
package com.alibaba.dubbo.rpc.protocol.hessian;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.protocol.AbstractInvoker;
import com.caucho.hessian.HessianException;
import com.caucho.hessian.client.HessianProxyFactory;
/**
* hessian rpc invoker.
*
* @author qianlei
*/
public class HessianRpcInvoker<T> extends AbstractInvoker<T> {
protected static final String HESSIAN_EXCEPTION_PREFIX = HessianException.class.getPackage().getName() + "."; //fix by tony.chenl
protected Invoker<T> invoker;
@SuppressWarnings("unchecked")
public HessianRpcInvoker(Class<T> serviceType, URL url, ProxyFactory proxyFactory){
super(serviceType, url);
int timeout;
String t = url.getParameter(Constants.TIMEOUT_KEY);
if (t != null && t.length() > 0) {
timeout = Integer.parseInt(t);
} else {
timeout = Constants.DEFAULT_TIMEOUT;
}
java.net.URL httpUrl = url.setProtocol("http").toJavaURL();
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
invoker = proxyFactory.getInvoker((T)hessianProxyFactory.create(serviceType, httpUrl, Thread.currentThread().getContextClassLoader()), serviceType, url);
}
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
return invoker.invoke(invocation);
} catch (RpcException e) {
throw e;
} catch (Throwable e) {
//fix by tony.chenl
if (e.getClass().getName().startsWith(HESSIAN_EXCEPTION_PREFIX)) {
throw new RpcException("Failed to invoke remote service: " + getInterface() + ", method: "
+ invocation.getMethodName() + ", cause: " + e.getMessage(), e);
}
return new RpcResult(e);
}
}
}
package com.alibaba.dubbo.rpc.protocol.hessian;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.caucho.hessian.io.AbstractHessianOutput;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import com.caucho.hessian.io.HessianOutput;
import com.caucho.services.server.AbstractSkeleton;
import com.caucho.services.server.ServiceContext;
class HessianSkeletonInvoker extends AbstractSkeleton {
private HessianRpcExporter<?> mExporter;
HessianSkeletonInvoker(Class<?> serviceType, HessianRpcExporter<?> exporter) {
super(serviceType);
mExporter = exporter;
}
public void invoke(InputStream is, OutputStream os) throws Throwable {
Hessian2Input in = new Hessian2Input(is);
// if (this.serializerFactory != null) {
// in.setSerializerFactory(this.serializerFactory);
// }
int code = in.read();
if (code != 'c')
throw new IOException("expected 'c' in hessian input at " + code);
AbstractHessianOutput out;
int major = in.read();
in.read(); // minor version, skip it.
if (major >= 2)
out = new Hessian2Output(os);
else
out = new HessianOutput(os);
// if (this.serializerFactory != null) {
// out.setSerializerFactory(this.serializerFactory);
// }
// see com.alibaba.dubbo.rpc.http.hessian.v3_2_0.hessian.server.HessianSkeleton
ServiceContext context = ServiceContext.getContext();
// backward compatibility for some frameworks that don't read
// the call type first
in.skipOptionalCall();
String header;
while ((header = in.readHeader()) != null) {
Object value = in.readObject();
context.addHeader(header, value);
}
String methodName = in.readMethod();
Method method = getMethod(methodName);
if (method != null) {
} else if ("_hessian_getAttribute".equals(methodName)) {
String attrName = in.readString();
in.completeCall();
String value = null;
if ("java.api.class".equals(attrName))
value = getAPIClassName();
else if ("java.home.class".equals(attrName))
value = getHomeClassName();
else if ("java.object.class".equals(attrName))
value = getObjectClassName();
out.startReply();
out.writeObject(value);
out.completeReply();
out.close();
return;
} else if ((method = HessianUtils.getFrameworkMethod(methodName)) == null) {
out.startReply();
out.writeFault("NoSuchMethodException",
"The service has no method named: " + in.getMethod(), null);
out.completeReply();
out.close();
return;
}
Class<?>[] args = method.getParameterTypes();
Object[] values = new Object[args.length];
for (int i = 0; i < args.length; i++) {
values[i] = in.readObject(args[i]);
}
Object result = null;
try {
RpcInvocation inv = new RpcInvocation(method, values);
result = mExporter.getInvoker().invoke(inv).recreate();
} catch (Throwable e) {
if (e instanceof InvocationTargetException) {
e = ((InvocationTargetException) e).getTargetException();
}
out.startReply();
out.writeFault("ServiceException", e.getMessage(), e);
out.completeReply();
out.close();
return;
}
// The complete call needs to be after the invoke to handle a
// trailing InputStream
in.completeCall();
out.startReply();
out.writeObject(result);
out.completeReply();
out.close();
}
}
package com.alibaba.dubbo.rpc.protocol.hessian;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.dubbo.rpc.service.EchoService;
import com.alibaba.dubbo.rpc.service.GenericService;
/**
*
*
* @author qianlei
*/
public class HessianUtils
{
private final static Method[] FrameworkMethods;
static
{
List<Method> methods = new ArrayList<Method>();
Class<?> c = EchoService.class;
for( Method method : c.getDeclaredMethods() )
methods.add(method);
c = GenericService.class;
for( Method method : c.getDeclaredMethods() )
methods.add(method);
FrameworkMethods = methods.toArray(new Method[0]);
}
private HessianUtils(){}
public static Method getFrameworkMethod(String name)
{
if( name != null )
{
for( Method method : FrameworkMethods )
if( name.startsWith(method.getName()) )
return method;
}
return null;
}
}
com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
\ No newline at end of file
......@@ -20,6 +20,7 @@ import java.util.Map;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
......@@ -42,19 +43,12 @@ class InjvmInvoker<T> extends AbstractInvoker<T> {
this.exporterMap = exporterMap;
}
public Object doInvoke(Invocation invocation) throws Throwable {
public Result doInvoke(Invocation invocation) throws Throwable {
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
Result result;
try {
result = exporter.invoke(invocation, NetUtils.LOCALHOST, 0);
} catch (RpcException e) {
throw e;
} catch (Throwable e) {
throw new RpcException(e);
}
return result.recreate();
RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
return exporter.getInvoker().invoke(invocation);
}
}
\ No newline at end of file
......@@ -35,6 +35,7 @@
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 1999-2101 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.rmi;
import java.lang.reflect.InvocationTargetException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcResult;
/**
* Generic Remote object adapter to rmi invocation handler.
*
* @serial
* @author qian.lei
*/
class RemoteObject2RmiInvocationHandler implements RmiInvocationHandler
{
private Remote mRemote;
private Wrapper mWrapper;
RemoteObject2RmiInvocationHandler(Remote remote, Class<?> type)
{
// check remote object and interface.
if( type.isInterface() == false )
throw new IllegalArgumentException("Service type must be interface. " + type.getName());
if( type.isInstance(remote) == false )
throw new IllegalArgumentException("Remote object must implement interface: " + type.getName());
mRemote = remote;
mWrapper = Wrapper.getWrapper(type);
}
public Result invoke(Invocation inv)
throws RemoteException, NoSuchMethodException, IllegalAccessException, InvocationTargetException
{
RpcResult result = new RpcResult();
try
{
result.setResult(mWrapper.invokeMethod(mRemote, inv.getMethodName(), inv.getParameterTypes(), inv.getArguments()));
}
catch(InvocationTargetException e)
{
Throwable rmiInvocationEx = e.getTargetException();
if(null == rmiInvocationEx) throw e;
if(rmiInvocationEx.getClass().getName().startsWith("java.rmi.")
|| rmiInvocationEx.getClass().getName().startsWith("javax.rmi.")) {
throw new RemoteException("", rmiInvocationEx);
}
result.setException(rmiInvocationEx);
}
return result;
}
}
\ No newline at end of file
......@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.rmi;
package com.alibaba.dubbo.rpc.protocol.rmi;
import java.rmi.Remote;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
......@@ -23,54 +23,46 @@ import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.protocol.AbstractExporter;
/**
* Rmi exporter.
*
* @author qian.lei
*/
public class RmiExporter<T> extends AbstractExporter<T>
{
private static final Logger Log = LoggerFactory.getLogger(RmiExporter.class);
private Remote mRemote;
private Registry mRmiRegistry;
RmiExporter(Invoker<T> invoker) {
super(invoker);
}
public void unexport()
{
super.unexport();
if( mRmiRegistry != null )
{
try
{
// unbind.
mRmiRegistry.unbind(getInvoker().getUrl().getPath());
// unexport.
if( mRemote != null )
UnicastRemoteObject.unexportObject(mRemote, true);
}
catch(Exception e)
{
Log.warn("Unexport rmi object error.", e); //ignore it.
}
mRemote = null;
mRmiRegistry = null;
}
}
void setRmiRegistry(Registry reg)
{
mRmiRegistry = reg;
}
void setRemoteObject(Remote remote)
{
mRemote = remote;
}
}
\ No newline at end of file
/**
* Rmi exporter.
*
* @author qian.lei
*/
public class RmiExporter<T> extends AbstractExporter<T> {
private static final Logger Log = LoggerFactory.getLogger(RmiExporter.class);
private Remote remote;
private Registry registry;
public RmiExporter(Invoker<T> invoker, Remote remote, Registry registry) {
super(invoker);
this.remote = remote;
this.registry = registry;
}
public void unexport() {
super.unexport();
// unexport.
if (remote != null) {
try {
UnicastRemoteObject.unexportObject(remote, true);
} catch (Exception e) {
Log.warn("Unexport rmi object error.", e); //ignore it.
}
remote = null;
}
if (registry != null) {
try {
// unbind.
registry.unbind(getInvoker().getUrl().getPath());
} catch (Exception e) {
Log.warn("Unexport rmi object error.", e); //ignore it.
}
registry = null;
}
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.rmi;
import java.lang.reflect.InvocationTargetException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
/**
* rmi invocation handler.
*
* @serial Don't change the class name.
* @author qianlei
*/
public interface RmiInvocationHandler extends Remote {
/**
* invoke.
*
* @param invocation invocation.
* @return result.
* @throws RemoteException.
* @throws NoSuchMethodException.
* @throws IllegalAccessException.
* @throws InvocationTargetException.
*/
public Result invoke(Invocation invocation) throws RemoteException, NoSuchMethodException,
IllegalAccessException, InvocationTargetException;
}
......@@ -13,93 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.rmi;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
package com.alibaba.dubbo.rpc.protocol.rmi;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.protocol.AbstractInvoker;
/**
* rmi rpc invoker.
*
* @author qian.lei
*/
public class RmiInvoker<T> extends AbstractInvoker<T> {
private RmiInvocationHandler proxy;
static boolean isInstance(Object obj, String interfaceClazzName) {
for(Class<?> clazz = obj.getClass(); clazz != null && !clazz.equals(Object.class);
clazz = clazz.getSuperclass()) {
Class<?>[] interfaces = clazz.getInterfaces();
for(Class<?> itf : interfaces) {
if(itf.getName().equals(interfaceClazzName)) return true;
}
}
return false;
}
public RmiInvoker(Class<T> serviceType, URL url)
{
super(serviceType, url);
try
{
Registry reg = LocateRegistry.getRegistry(url.getHost(), url.getPort());
String path = url.getPath();
if (path == null || path.length() == 0) {
path = serviceType.getName();
}
Remote rmt = reg.lookup(path);
if( rmt instanceof RmiInvocationHandler ) {
// is the Remote wrap type in Dubbo2
proxy = (RmiInvocationHandler)rmt;
}
else if(isInstance(rmt, "org.springframework.remoting.rmi.RmiInvocationHandler")) {
// is the Remote wrap type in spring? (spring rmi is used in Dubbo1)
proxy = new SpringHandler2RmiInvocationHandler((org.springframework.remoting.rmi.RmiInvocationHandler)rmt, serviceType);
}
else
proxy = new RemoteObject2RmiInvocationHandler(rmt, serviceType);
}
catch(RemoteException e)
{
Throwable cause = e.getCause();
boolean isExportedBySpringButNoSpringClass = ClassNotFoundException.class.isInstance(cause)
&& cause.getMessage().contains("org.springframework.remoting.rmi.RmiInvocationHandler");
String msg = String.format("Can not create remote object%s. url = %s",
isExportedBySpringButNoSpringClass ? "(Rmi object is exported by spring rmi but NO spring class org.springframework.remoting.rmi.RmiInvocationHandler at consumer side)" : "",
url);
throw new RpcException(msg, e);
}
catch(NotBoundException e)
{
throw new RpcException("Rmi service not found. url = " + url, e);
}
}
@Override
protected Object doInvoke(Invocation invocation) throws Throwable {
Result result;
try
{
result = proxy.invoke((RpcInvocation) invocation);
}
catch(Throwable e) // here is non-biz exception, wrap it.
{
throw new RpcException(e);
}
return result.recreate();
}
}
\ No newline at end of file
/**
* RmiInvoker.
*
* @author qian.lei
*/
public class RmiInvoker<T> extends AbstractInvoker<T> {
private Invoker<T> invoker;
public RmiInvoker(Invoker<T> invoker) {
super(invoker.getInterface(), invoker.getUrl());
this.invoker = invoker;
}
@Override
protected Result doInvoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
/*
* Copyright 1999-2101 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.rmi;
import java.lang.reflect.InvocationTargetException;
import java.rmi.RemoteException;
import org.springframework.remoting.support.RemoteInvocation;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcResult;
/**
*
* @serial
* @author ding.lid
*/
class SpringHandler2RmiInvocationHandler implements RmiInvocationHandler {
private org.springframework.remoting.rmi.RmiInvocationHandler springHandler;
SpringHandler2RmiInvocationHandler(org.springframework.remoting.rmi.RmiInvocationHandler springHandler, Class<?> type)
{
// check remote object and interface.
if( type.isInterface() == false )
throw new IllegalArgumentException("Service type must be interface. " + type.getName());
this.springHandler = springHandler;
}
public Result invoke(Invocation inv)
throws RemoteException, NoSuchMethodException, IllegalAccessException, InvocationTargetException
{
RpcResult result = new RpcResult();
try
{
RemoteInvocation i = new RemoteInvocation();
i.setMethodName(inv.getMethodName());
i.setParameterTypes(inv.getParameterTypes());
i.setArguments(inv.getArguments());
result.setResult(springHandler.invoke(i));
}
catch(InvocationTargetException e)
{
result.setException(e.getTargetException());
}
return result;
}
}
\ No newline at end of file
......@@ -13,98 +13,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol;
import java.net.InetSocketAddress;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
/**
* AbstractExporter.
*
* @author qianlei
* @author william.liangf
*/
public abstract class AbstractExporter<T> implements Exporter<T> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if(invoker == null)
throw new IllegalStateException("service invoker == null");
if(invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if(invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
return invoker;
}
public void unexport() {
if (unexported)
throw new IllegalStateException("The exporter " + this + " unexported!");
unexported = true;
getInvoker().destroy();
}
public String toString() {
return getInvoker().toString();
}
/**
* invoke.
*
* <code>
* Context.getContext().setRemoteAddress(remoteAddress);
* getInvoker().invoke(invocation);
* </code>
*
* @param invocation
* @param remoteAddress
* @return
* @throws RpcException
*/
public Result invoke(Invocation invocation, InetSocketAddress remoteAddress) throws RpcException {
RpcContext.getContext().setRemoteAddress(remoteAddress);
return getInvoker().invoke(invocation);
}
/**
* invoke.
*
* <code>
* Context.getContext().setRemoteAddress(remoteHost, remotePort);
* getInvoker().invoke(invocation);
* </code>
*
* @param invocation
* @param remoteHost
* @param remotePort
* @return
* @throws RpcException
*/
public Result invoke(Invocation invocation, String remoteHost, int remotePort) throws RpcException {
if (remoteHost != null && remoteHost.length() > 0) {
if (remotePort < 0) {
remotePort = 0;
}
RpcContext.getContext().setRemoteAddress(remoteHost, remotePort);
}
return getInvoker().invoke(invocation);
}
}
\ No newline at end of file
package com.alibaba.dubbo.rpc.protocol;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
/**
* AbstractExporter.
*
* @author qianlei
* @author william.liangf
*/
public abstract class AbstractExporter<T> implements Exporter<T> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if (invoker == null)
throw new IllegalStateException("service invoker == null");
if (invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if (invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
return invoker;
}
public void unexport() {
if (unexported) {
throw new IllegalStateException("The exporter " + this + " unexported!");
}
unexported = true;
getInvoker().destroy();
}
public String toString() {
return getInvoker().toString();
}
}
......@@ -113,10 +113,11 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
}
public Result invoke(Invocation inv) throws RpcException {
if(destroyed)
if(destroyed) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
Map<String, String> attachments = new HashMap<String, String>();
if (attachment != null && attachment.size() > 0) {
......@@ -130,32 +131,29 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
attachments.putAll(invocation.getAttachments());
}
invocation.setAttachments(attachments);
RpcResult result = new RpcResult();
try {
Object obj = doInvoke(invocation);
result.setResult(obj);
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
result.setException(e);
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
result.setException(te);
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
result.setException(e);
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
result.setException(e);
return new RpcResult(e);
}
return result;
}
protected abstract Object doInvoke(Invocation invocation) throws Throwable;
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
\ No newline at end of file
......@@ -58,6 +58,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting-http</artifactId>
<version>${project.parent.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-rpc-default</artifactId>
......
......@@ -32,6 +32,7 @@
<module>dubbo-remoting-netty</module>
<module>dubbo-remoting-mina</module>
<module>dubbo-remoting-grizzly</module>
<module>dubbo-remoting-http</module>
<module>dubbo-rpc</module>
<module>dubbo-rpc-default</module>
<module>dubbo-rpc-injvm</module>
......@@ -62,6 +63,8 @@
<zookeeper_version>3.3.3</zookeeper_version>
<jfreechart_version>1.0.13</jfreechart_version>
<hessian_version>4.0.7</hessian_version>
<servlet_version>2.5</servlet_version>
<jetty_version>6.1.26</jetty_version>
<!-- Log libs -->
<log4j_version>1.2.16</log4j_version>
<slf4j_version>1.6.2</slf4j_version>
......@@ -141,6 +144,16 @@
<artifactId>hessian</artifactId>
<version>${hessian_version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet_version}</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>${jetty_version}</version>
</dependency>
<!-- Log libs -->
<dependency>
<groupId>log4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册