提交 7b1d5fe9 编写于 作者: A allenxwang

Merge pull request #85 from allenxwang/cp

Provide proper shutdown mechanism for load balancer, HttpClient, and RestClient 
......@@ -310,8 +310,9 @@ public class PrimeConnections {
return executorService.submit(ftConn);
}
void shutdown() {
public void shutdown() {
executorService.shutdown();
Monitors.unregisterObject(name + "_PrimeConnection", this);
}
private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) {
......
......@@ -855,4 +855,13 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
boolean enablePrimingConnections) {
this.enablePrimingConnections = enablePrimingConnections;
}
public void shutdown() {
cancelPingTask();
if (primeConnections != null) {
primeConnections.shutdown();
}
Monitors.unregisterObject("LoadBalancer_" + name, this);
Monitors.unregisterObject("Rule_" + name, this.getRule());
}
}
......@@ -278,6 +278,9 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
public void run() {
if (!serverRefreshEnabled) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
......@@ -353,10 +356,17 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString());
sb.append("ServerList:" + String.valueOf(serverListImpl));
return sb.toString();
}
@Override
public void shutdown() {
super.shutdown();
stopServerListRefreshing();
}
}
/*
*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.utils;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import com.netflix.config.DynamicIntProperty;
/**
* A {@link ScheduledThreadPoolExecutor} whose core size can be dynamically changed by a given {@link DynamicIntProperty} and
* registers itself with a shutdown hook to shut down.
*
* @author awang
*
*/
public class ScheduledThreadPoolExectuorWithDynamicSize extends ScheduledThreadPoolExecutor {
private final Thread shutdownThread;
public ScheduledThreadPoolExectuorWithDynamicSize(final DynamicIntProperty corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize.get(), threadFactory);
corePoolSize.addCallback(new Runnable() {
public void run() {
setCorePoolSize(corePoolSize.get());
}
});
shutdownThread = new Thread(new Runnable() {
public void run() {
shutdown();
if (shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
}
}
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
......@@ -95,6 +95,13 @@ public class LoadBalancerContextTest {
HttpRequest newRequest = context.computeFinalUriWithLoadBalancer(request);
assertEquals(uri + queryString, newRequest.getUri().toString());
}
@Test
public void testComputeFinalUriWithLoadBalancer_regressionRaw() throws ClientException {
HttpRequest request = HttpRequest.newBuilder().uri("/test?ampersand=foo%26bar").build();
HttpRequest newRequest = context.computeFinalUriWithLoadBalancer(request);
assertEquals("http://www.example.com:8080/test?ampersand=foo%26bar", newRequest.getUri().toString());
}
}
class MyLoadBalancerContext extends LoadBalancerContext<HttpRequest, HttpResponse> {
......
......@@ -60,6 +60,9 @@ public class DiscoveryEnabledNIWSServerList extends AbstractServerList<Discovery
public void initWithNiwsConfig(IClientConfig clientConfig) {
clientName = clientConfig.getClientName();
vipAddresses = clientConfig.resolveDeploymentContextbasedVipAddresses();
if (vipAddresses == null) {
throw new NullPointerException("VIP address for client " + clientName + " is null");
}
isSecure = Boolean.parseBoolean(""+clientConfig.getProperty(CommonClientConfigKey.IsSecure, "false"));
prioritizeVipAddressBasedServers = Boolean.parseBoolean(""+clientConfig.getProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, prioritizeVipAddressBasedServers));
datacenter = ConfigurationManager.getDeploymentContext().getDeploymentDatacenter();
......@@ -100,8 +103,6 @@ public class DiscoveryEnabledNIWSServerList extends AbstractServerList<Discovery
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
......
......@@ -17,8 +17,8 @@
*/
package com.netflix.http4;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionManager;
......@@ -40,21 +40,21 @@ public class ConnectionPoolCleaner {
String name = "default";
ClientConnectionManager connMgr;
Timer timer;
ScheduledExecutorService scheduler;
private DynamicIntProperty connIdleEvictTimeMilliSeconds
= DynamicPropertyFactory.getInstance().getIntProperty("default.nfhttpclient.connIdleEvictTimeMilliSeconds",
NFHttpClientConstants.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS);
boolean enableConnectionPoolCleanerTask = false;
volatile boolean enableConnectionPoolCleanerTask = false;
long connectionCleanerTimerDelay = 10;
long connectionCleanerRepeatInterval = NFHttpClientConstants.DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS;
private volatile ScheduledFuture<?> scheduledFuture;
public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr){
public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr, ScheduledExecutorService scheduler){
this.name = name;
this.connMgr = connMgr;
this.connMgr = connMgr;
this.scheduler = scheduler;
}
public DynamicIntProperty getConnIdleEvictTimeMilliSeconds() {
......@@ -94,28 +94,22 @@ public class ConnectionPoolCleaner {
}
public void initTask(){
if (enableConnectionPoolCleanerTask){
timer = new Timer(name + "-ConnectionPoolCleanerThread", true);
timer.schedule(new TimerTask() {
public void run() {
try {
if (enableConnectionPoolCleanerTask) {
scheduledFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
if (enableConnectionPoolCleanerTask) {
logger.debug("Connection pool clean up started for client {}", name);
cleanupConnections();
} catch (Throwable e) {
logger.error("Exception in ConnectionPoolCleanerThread",e);
//e.printStackTrace();
} else if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
} catch (Throwable e) {
logger.error("Exception in ConnectionPoolCleanerThread",e);
}
}, connectionCleanerTimerDelay, connectionCleanerRepeatInterval);
logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name);
// Add it to the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run()
{
logger.info("Stopping the ConnectionPoolCleaner Update Task");
timer.cancel();
}
}));
}, connectionCleanerTimerDelay, connectionCleanerRepeatInterval, TimeUnit.MILLISECONDS);
logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name);
}
}
......@@ -124,6 +118,13 @@ public class ConnectionPoolCleaner {
connMgr.closeIdleConnections(connIdleEvictTimeMilliSeconds.get(), TimeUnit.MILLISECONDS);
}
public void shutdown() {
enableConnectionPoolCleanerTask = false;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
public String toString(){
StringBuilder sb = new StringBuilder();
......@@ -136,4 +137,5 @@ public class ConnectionPoolCleaner {
}
}
......@@ -83,6 +83,4 @@ public class MonitoredConnectionManager extends ThreadSafeClientConnManager {
// TODO Auto-generated method stub
return super.requestConnection(route, state);
}
}
......@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -44,6 +46,7 @@ import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
......@@ -54,6 +57,7 @@ import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import com.netflix.utils.ScheduledThreadPoolExectuorWithDynamicSize;
/**
* Netflix extension of Apache 4.0 HttpClient
......@@ -67,7 +71,11 @@ public class NFHttpClient extends DefaultHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(NFHttpClient.class);
protected static final String EXECUTE_TRACER = "HttpClient-ExecuteTimer";
private static final DynamicIntProperty CORE_SIZE = new DynamicIntProperty("NFHttpClient.connectionPoolCleanerNumberCoreThreads", 2);
private static ScheduledExecutorService connectionPoolCleanUpScheduler;
private HttpHost httpHost = null;
private HttpRoute httpRoute = null;
......@@ -87,6 +95,13 @@ public class NFHttpClient extends DefaultHttpClient {
private DynamicIntProperty maxTotalConnectionProperty;
private DynamicIntProperty maxConnectionPerHostProperty;
static {
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true)
.setNameFormat("Connection pool clean up thread")
.build();
connectionPoolCleanUpScheduler = new ScheduledThreadPoolExectuorWithDynamicSize(CORE_SIZE, factory);
}
protected NFHttpClient(String host, int port){
super(new ThreadSafeClientConnManager());
this.name = "UNNAMED_" + numNonNamedHttpClients.incrementAndGet();
......@@ -128,7 +143,7 @@ public class NFHttpClient extends DefaultHttpClient {
defaultHeaders.add(new BasicHeader("X-netflix-httpclientname", name));
params.setParameter(ClientPNames.DEFAULT_HEADERS, defaultHeaders);
connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager());
connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager(), connectionPoolCleanUpScheduler);
this.retriesProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient" + ".retries", 3);
this.sleepTimeFactorMsProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient"+ ".sleepTimeFactorMs", 10);
......@@ -292,4 +307,11 @@ public class NFHttpClient extends DefaultHttpClient {
sw.stop();
}
}
}
\ No newline at end of file
public void shutdown() {
if (connPoolCleaner != null) {
connPoolCleaner.shutdown();
}
getConnectionManager().shutdown();
}
}
......@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.http.client.HttpClient;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
......@@ -89,9 +88,9 @@ public class NFHttpClientFactory {
}
public static void shutdownNFHttpClient(String name) {
HttpClient c = namedClientMap.get(name);
if(c != null) {
c.getConnectionManager().shutdown();
NFHttpClient c = namedClientMap.get(name);
if (c != null) {
c.shutdown();
namedClientMap.remove(name);
Monitors.unregisterObject(name, c);
}
......
......@@ -55,6 +55,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
private Counter deleteCounter;
private Timer requestTimer;
private Timer creationTimer;
private String name;
public NamedConnectionPool(String name, ClientConnectionOperator operator,
ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL,
......@@ -100,6 +101,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
deleteCounter = Monitors.newCounter(name + "_Delete");
requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS);
creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS);
this.name = name;
Monitors.registerObject(name, this);
}
......@@ -180,4 +182,9 @@ public class NamedConnectionPool extends ConnPoolByRoute {
return this.getConnectionsInPool();
}
@Override
public void shutdown() {
super.shutdown();
Monitors.unregisterObject(name, this);
}
}
......@@ -26,12 +26,8 @@ import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.ws.rs.core.MultivaluedMap;
import com.netflix.client.ClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.client.HttpClient;
import org.apache.http.client.UserTokenHandler;
......@@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory;
import com.netflix.client.AbstractLoadBalancerAwareClient;
import com.netflix.client.ClientException;
import com.netflix.client.ClientRequest;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
......@@ -68,10 +64,11 @@ import com.netflix.http4.NFHttpClientConstants;
import com.netflix.http4.NFHttpClientFactory;
import com.netflix.http4.NFHttpMethodRetryHandler;
import com.netflix.http4.ssl.KeyStoreAwareSocketFactory;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.niws.cert.AbstractSslContextFactory;
import com.netflix.niws.client.ClientSslSocketFactoryException;
import com.netflix.niws.client.URLSslContextFactory;
import com.netflix.niws.client.http.HttpClientRequest.Verb;
import com.netflix.util.Pair;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
......@@ -685,4 +682,12 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
}
return super.deriveHostAndPortFromVipAddress(vipAddress);
}
public void shutdown() {
ILoadBalancer lb = this.getLoadBalancer();
if (lb instanceof BaseLoadBalancer) {
((BaseLoadBalancer) lb).shutdown();
}
NFHttpClientFactory.shutdownNFHttpClient(restClientName);
}
}
......@@ -17,6 +17,7 @@
*/
package com.netflix.http4;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -28,8 +29,11 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.Test;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.config.ConfigurationManager;
import com.netflix.niws.client.http.RestClient;
public class NamedConnectionPoolTest {
@Test
......@@ -69,7 +73,27 @@ public class NamedConnectionPoolTest {
ConfigurationManager.getConfigInstance().setProperty("google-NamedConnectionPoolTest.ribbon." + CommonClientConfigKey.MaxHttpConnectionsPerHost.key(), "10");
assertEquals(50, connectionPoolManager.getMaxTotal());
assertEquals(10, connectionPoolManager.getDefaultMaxPerRoute());
}
@Test
public void testConnectionPoolCleaner() throws Exception {
// LogManager.getRootLogger().setLevel((Level)Level.DEBUG);
ConfigurationManager.getConfigInstance().setProperty("ConnectionPoolCleanerTest.ribbon." + CommonClientConfigKey.ConnIdleEvictTimeMilliSeconds, "100");
ConfigurationManager.getConfigInstance().setProperty("ConnectionPoolCleanerTest.ribbon." + CommonClientConfigKey.ConnectionCleanerRepeatInterval, "500");
RestClient client = (RestClient) ClientFactory.getNamedClient("ConnectionPoolCleanerTest");
NFHttpClient httpclient = NFHttpClientFactory.getNamedNFHttpClient("ConnectionPoolCleanerTest");
assertNotNull(httpclient);
com.netflix.client.http.HttpResponse response = null;
try {
response = client.execute(HttpRequest.newBuilder().uri("http://www.google.com/").build());
} finally {
if (response != null) {
response.close();
}
}
MonitoredConnectionManager connectionPoolManager = (MonitoredConnectionManager) httpclient.getConnectionManager();
Thread.sleep(2000);
assertEquals(0, connectionPoolManager.getConnectionsInPool());
client.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册