提交 17aa99e8 编写于 作者: K ken.lj

Upgrade rest integration

上级 f4dfc6e4
......@@ -112,7 +112,7 @@
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
......
......@@ -16,9 +16,17 @@
*/
package com.alibaba.dubbo.config.spring;
import com.alibaba.dubbo.config.*;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ModuleConfig;
import com.alibaba.dubbo.config.MonitorConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.ProviderConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.ServiceConfig;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
......@@ -258,4 +266,11 @@ public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean
unexport();
}
// merged from dubbox
protected Class getServiceClass(T ref) {
if (AopUtils.isAopProxy(ref)) {
return AopUtils.getTargetClass(ref);
}
return super.getServiceClass(ref);
}
}
\ No newline at end of file
......@@ -38,6 +38,16 @@ limitations under the License.
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -60,6 +60,20 @@ limitations under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
......
......@@ -158,7 +158,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<artifactId>javax.servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
......
......@@ -38,6 +38,12 @@
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
......
......@@ -25,6 +25,7 @@ import com.alibaba.dubbo.remoting.http.HttpHandler;
import com.alibaba.dubbo.remoting.http.servlet.DispatcherServlet;
import com.alibaba.dubbo.remoting.http.servlet.ServletManager;
import com.alibaba.dubbo.remoting.http.support.AbstractHttpServer;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.Context;
......@@ -39,16 +40,17 @@ public class JettyHttpServer extends AbstractHttpServer {
private static final Logger logger = LoggerFactory.getLogger(JettyHttpServer.class);
private Server server;
private URL url;
public JettyHttpServer(URL url, final HttpHandler handler) {
super(url, handler);
this.url = url;
// TODO we should leave this setting to slf4j
// we must disable the debug logging for production use
Log.setLog(new StdErrLog());
Log.getLog().setDebugEnabled(false);
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
......@@ -72,13 +74,13 @@ public class JettyHttpServer extends AbstractHttpServer {
ServletHandler servletHandler = new ServletHandler();
ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
servletHolder.setInitOrder(2);
// dubbo's original impl can't support the use of ServletContext
// server.addHandler(servletHandler);
// server.addHandler(servletHandler);
// TODO Context.SESSIONS is the best option here?
Context context = new Context(server, "/", Context.SESSIONS);
context.setServletHandler(servletHandler);
ServletManager.getInstance().addServletContext(url.getPort(), context.getServletContext());
ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());
try {
server.start();
......@@ -90,7 +92,10 @@ public class JettyHttpServer extends AbstractHttpServer {
public void close() {
super.close();
ServletManager.getInstance().removeServletContext(url.getPort());
//
ServletManager.getInstance().removeServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
if (server != null) {
try {
server.stop();
......
......@@ -21,9 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* TODO this may not be a pretty elegant solution,
* and we may need to make change to the whole remoting-http architecture in the future
*
* @author lishen
*/
public class ServletManager {
......
......@@ -68,13 +68,6 @@ public class RpcContext {
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
// now we don't use the 'values' map to hold these objects
// we want these objects to be as generic as possible
private Object request;
private Object response;
@Deprecated
private List<Invoker<?>> invokers;
@Deprecated
......@@ -82,11 +75,34 @@ public class RpcContext {
@Deprecated
private Invocation invocation;
// now we don't use the 'values' map to hold these objects
// we want these objects to be as generic as possible
private Object request;
private Object response;
protected RpcContext() {
}
/**
* Get the request object of the underlying RPC protocol, e.g. HttServletRequest
* get context.
*
* @return context
*/
public static RpcContext getContext() {
return LOCAL.get();
}
/**
* remove context.
*
* @see com.alibaba.dubbo.rpc.filter.ContextFilter
*/
public static void removeContext() {
LOCAL.remove();
}
/**
* Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
*
* @return null if the underlying protocol doesn't provide support for getting request
*/
......@@ -94,12 +110,23 @@ public class RpcContext {
return request;
}
/**
* Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
*
* @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type
*/
@SuppressWarnings("unchecked")
public <T> T getRequest(Class<T> clazz) {
return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
}
public void setRequest(Object request) {
this.request = request;
}
/**
* Get the response object of the underlying RPC protocol, e.g. HttServletResponse
* Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
*
* @return null if the underlying protocol doesn't provide support for getting response
*/
......@@ -107,26 +134,18 @@ public class RpcContext {
return response;
}
public void setResponse(Object response) {
this.response = response;
}
/**
* get context.
* Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
*
* @return context
* @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
*/
public static RpcContext getContext() {
return LOCAL.get();
@SuppressWarnings("unchecked")
public <T> T getResponse(Class<T> clazz) {
return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
}
/**
* remove context.
*
* @see com.alibaba.dubbo.rpc.filter.ContextFilter
*/
public static void removeContext() {
LOCAL.remove();
public void setResponse(Object response) {
this.response = response;
}
/**
......
package com.alibaba.dubbo.rpc;
/**
* TODO this is just a workround for rest protocol, and now we just ensure it works in the most common dubbo usages
* TODO this is just a workaround for rest protocol, and now we just ensure it works in the most common dubbo usages
*
* @author lishen
*/
......
......@@ -50,9 +50,20 @@ public class ContextFilter implements Filter {
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setAttachments(attachments)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
......
......@@ -60,7 +60,7 @@
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-netty</artifactId>
<artifactId>resteasy-netty4</artifactId>
</dependency>
<dependency>
......
......@@ -17,13 +17,17 @@ package com.alibaba.dubbo.rpc.protocol.rest;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import io.netty.channel.ChannelOption;
import org.jboss.resteasy.plugins.server.netty.NettyJaxrsServer;
import org.jboss.resteasy.spi.ResteasyDeployment;
import java.util.HashMap;
import java.util.Map;
/**
* Netty server can't support @Context injection of servlet objects since it's not a servlet container
*
* @author lishen
*/
public class NettyServer extends BaseRestServer {
......@@ -31,7 +35,9 @@ public class NettyServer extends BaseRestServer {
protected void doStart(URL url) {
server.setPort(url.getPort());
server.setKeepAlive(url.getParameter(Constants.KEEP_ALIVE_KEY, true));
Map<ChannelOption, Object> channelOption = new HashMap<ChannelOption, Object>();
channelOption.put(ChannelOption.SO_KEEPALIVE, url.getParameter(Constants.KEEP_ALIVE_KEY, Constants.DEFAULT_KEEP_ALIVE));
server.setChildChannelOptions(channelOption);
server.setExecutorThreadCount(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS));
server.setIoWorkerCount(url.getParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
server.start();
......
......@@ -22,18 +22,19 @@ import com.alibaba.dubbo.remoting.http.HttpBinder;
import com.alibaba.dubbo.remoting.http.servlet.BootstrapListener;
import com.alibaba.dubbo.remoting.http.servlet.ServletManager;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import com.alibaba.dubbo.rpc.ServiceClassHolder;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpResponse;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.jboss.resteasy.client.jaxrs.ResteasyClient;
......@@ -53,7 +54,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author lishen
*/
public class RestProtocol extends AbstractProxyProtocol {
......@@ -81,7 +81,7 @@ public class RestProtocol extends AbstractProxyProtocol {
}
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = url.getIp() + ":" + url.getPort();
String addr = getAddr(url);
Class implClass = ServiceClassHolder.getInstance().popServiceClass();
RestServer server = servers.get(addr);
if (server == null) {
......@@ -131,40 +131,41 @@ public class RestProtocol extends AbstractProxyProtocol {
}
// TODO more configs to add
PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
// 20 is the default maxTotal of current PoolingClientConnectionManager
connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20));
connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20));
connectionMonitor.addConnectionManager(connectionManager);
// BasicHttpContext localContext = new BasicHttpContext();
DefaultHttpClient httpClient = new DefaultHttpClient(connectionManager);
httpClient.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
return Long.parseLong(value) * 1000;
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
.setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
.build();
SocketConfig socketConfig = SocketConfig.custom()
.setSoKeepAlive(true)
.setTcpNoDelay(true)
.build();
CloseableHttpClient httpClient = HttpClientBuilder.create()
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
return Long.parseLong(value) * 1000;
}
}
// TODO constant
return 30 * 1000;
}
}
// TODO constant
return 30 * 1000;
}
});
HttpParams params = httpClient.getParams();
// TODO currently no xml config for Constants.CONNECT_TIMEOUT_KEY so we directly reuse Constants.TIMEOUT_KEY for now
HttpConnectionParams.setConnectionTimeout(params, url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
HttpConnectionParams.setSoTimeout(params, url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
HttpConnectionParams.setTcpNoDelay(params, true);
HttpConnectionParams.setSoKeepalive(params, true);
})
.setDefaultRequestConfig(requestConfig)
.setDefaultSocketConfig(socketConfig)
.build();
ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/);
......@@ -231,9 +232,9 @@ public class RestProtocol extends AbstractProxyProtocol {
protected class ConnectionMonitor extends Thread {
private volatile boolean shutdown;
private final List<ClientConnectionManager> connectionManagers = Collections.synchronizedList(new LinkedList<ClientConnectionManager>());
private final List<PoolingHttpClientConnectionManager> connectionManagers = Collections.synchronizedList(new LinkedList<PoolingHttpClientConnectionManager>());
public void addConnectionManager(ClientConnectionManager connectionManager) {
public void addConnectionManager(PoolingHttpClientConnectionManager connectionManager) {
connectionManagers.add(connectionManager);
}
......@@ -242,7 +243,7 @@ public class RestProtocol extends AbstractProxyProtocol {
while (!shutdown) {
synchronized (this) {
wait(1000);
for (ClientConnectionManager connectionManager : connectionManagers) {
for (PoolingHttpClientConnectionManager connectionManager : connectionManagers) {
connectionManager.closeExpiredConnections();
// TODO constant
connectionManager.closeIdleConnections(30, TimeUnit.SECONDS);
......
......@@ -105,7 +105,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
......
......@@ -100,6 +100,14 @@ limitations under the License.
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-logging-juli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......@@ -202,7 +210,7 @@ limitations under the License.
</exclusion>
<exclusion>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-netty</artifactId>
<artifactId>resteasy-netty4</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.resteasy</groupId>
......@@ -398,6 +406,7 @@ limitations under the License.
<include>com.alibaba:dubbo-rpc-thrift</include>
<include>com.alibaba:dubbo-rpc-memcached</include>
<include>com.alibaba:dubbo-rpc-redis</include>
<include>com.alibaba:dubbo-rpc-rest</include>
<include>com.alibaba:dubbo-filter-validation</include>
<include>com.alibaba:dubbo-filter-cache</include>
<include>com.alibaba:dubbo-cluster</include>
......
......@@ -102,7 +102,7 @@ limitations under the License.
<thrift_version>0.8.0</thrift_version>
<jfreechart_version>1.0.13</jfreechart_version>
<hessian_version>4.0.38</hessian_version>
<servlet_version>2.5</servlet_version>
<servlet_version>3.1.0</servlet_version>
<jetty_version>6.1.26</jetty_version>
<validation_version>1.1.0.Final</validation_version>
<hibernate_validator_version>5.4.1.Final</hibernate_validator_version>
......@@ -113,11 +113,11 @@ limitations under the License.
<cglib_version>2.2</cglib_version>
<webx_version>3.1.6</webx_version>
<velocity_version>1.7</velocity_version>
<kryo_version>2.24.0</kryo_version>
<kryo_serializers_version>0.26</kryo_serializers_version>
<fst_version>1.55</fst_version>
<kryo_version>4.0.1</kryo_version>
<kryo_serializers_version>0.42</kryo_serializers_version>
<fst_version>2.48-jdk-6</fst_version>
<rs_api_version>2.0</rs_api_version>
<resteasy_version>3.0.7.Final</resteasy_version>
<resteasy_version>3.1.4.Final</resteasy_version>
<tomcat_embed_version>8.0.11</tomcat_embed_version>
<!-- Log libs -->
<slf4j_version>1.7.25</slf4j_version>
......@@ -261,7 +261,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<artifactId>javax.servlet-api</artifactId>
<version>${servlet_version}</version>
</dependency>
<dependency>
......@@ -310,7 +310,7 @@ limitations under the License.
<version>${velocity_version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo_version}</version>
</dependency>
......@@ -341,7 +341,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-netty</artifactId>
<artifactId>resteasy-netty4</artifactId>
<version>${resteasy_version}</version>
</dependency>
<dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册