/* * * Copyright 2013 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package com.netflix.client; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.Server; import com.netflix.servo.monitor.Counter; import com.netflix.servo.monitor.Monitors; import com.netflix.servo.monitor.Stopwatch; import com.netflix.servo.monitor.Timer; /** * Prime the connections for a given Client (For those Client that * have a LoadBalancer that knows the set of Servers it will connect to) This is * mainly done to address those deployment environments (Read EC2) which benefit * from a firewall connection/path warmup prior to actual use for live requests. *
* This class is not protocol specific. Actual priming operation is delegated to
* instance of {@link IPrimeConnection}, which is instantiated using reflection
* according to property {@link CommonClientConfigKey#PrimeConnectionsClassName}.
*
* @author stonse
* @author awang
*
*/
public class PrimeConnections {
public static interface PrimeConnectionListener {
public void primeCompleted(Server s, Throwable lastException);
}
static class PrimeConnectionCounters {
final AtomicInteger numServersLeft;
final AtomicInteger numServers;
final AtomicInteger numServersSuccessful;
public PrimeConnectionCounters(int initialSize) {
numServersLeft = new AtomicInteger(initialSize);
numServers = new AtomicInteger(initialSize);
numServersSuccessful = new AtomicInteger(0);
}
}
private static final Logger logger = LoggerFactory.getLogger(PrimeConnections.class);
// affordance to change the URI we connect to while "priming"
// default of "/" is good for most - but if its heavy operation on
// the server side, then a more lightweight URI can be chosen
String primeConnectionsURIPath = "/";
/**
* Executor service for executing asynchronous requests.
*/
private ExecutorService executorService;
private int maxExecutorThreads = 5;
private long executorThreadTimeout = 30000;
private String name = "default";
private int maxTasksPerExecutorQueue = 100;
private float primeRatio = 1.0f;
int maxRetries = 9;
long maxTotalTimeToPrimeConnections = 30 * 1000; // default time
long totalTimeTaken = 0; // Total time taken
private boolean aSync = true;
Counter totalCounter;
Counter successCounter;
Timer initialPrimeTimer;
private IPrimeConnection connector;
private PrimeConnections() {
}
public PrimeConnections(String name, IClientConfig niwsClientConfig) {
int maxRetriesPerServerPrimeConnection = Integer.valueOf(DefaultClientConfigImpl.DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION);
long maxTotalTimeToPrimeConnections = Long.valueOf(DefaultClientConfigImpl.DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS);
String primeConnectionsURI = DefaultClientConfigImpl.DEFAULT_PRIME_CONNECTIONS_URI;
String className = DefaultClientConfigImpl.DEFAULT_PRIME_CONNECTIONS_CLASS;
try {
maxRetriesPerServerPrimeConnection = Integer.parseInt(String.valueOf(niwsClientConfig.getProperty(
CommonClientConfigKey.MaxRetriesPerServerPrimeConnection, maxRetriesPerServerPrimeConnection)));
} catch (Exception e) {
logger.warn("Invalid maxRetriesPerServerPrimeConnection");
}
try {
maxTotalTimeToPrimeConnections = Long.parseLong(String.valueOf(niwsClientConfig.getProperty(
CommonClientConfigKey.MaxTotalTimeToPrimeConnections,maxTotalTimeToPrimeConnections)));
} catch (Exception e) {
logger.warn("Invalid maxTotalTimeToPrimeConnections");
}
primeConnectionsURI = String.valueOf(niwsClientConfig.getProperty(CommonClientConfigKey.PrimeConnectionsURI, primeConnectionsURI));
float primeRatio = Float.parseFloat(String.valueOf(niwsClientConfig.getProperty(CommonClientConfigKey.MinPrimeConnectionsRatio)));
className = (String) niwsClientConfig.getProperty(CommonClientConfigKey.PrimeConnectionsClassName,
DefaultClientConfigImpl.DEFAULT_PRIME_CONNECTIONS_CLASS);
try {
connector = (IPrimeConnection) Class.forName(className).newInstance();
connector.initWithNiwsConfig(niwsClientConfig);
} catch (Exception e) {
throw new RuntimeException("Unable to initialize prime connections", e);
}
setUp(name, maxRetriesPerServerPrimeConnection,
maxTotalTimeToPrimeConnections, primeConnectionsURI, primeRatio);
}
public PrimeConnections(String name, int maxRetries,
long maxTotalTimeToPrimeConnections, String primeConnectionsURI) {
setUp(name, maxRetries, maxTotalTimeToPrimeConnections, primeConnectionsURI, DefaultClientConfigImpl.DEFAULT_MIN_PRIME_CONNECTIONS_RATIO);
}
public PrimeConnections(String name, int maxRetries,
long maxTotalTimeToPrimeConnections, String primeConnectionsURI, float primeRatio) {
setUp(name, maxRetries, maxTotalTimeToPrimeConnections, primeConnectionsURI, primeRatio);
}
private void setUp(String name, int maxRetries,
long maxTotalTimeToPrimeConnections, String primeConnectionsURI, float primeRatio) {
this.name = name;
this.maxRetries = maxRetries;
this.maxTotalTimeToPrimeConnections = maxTotalTimeToPrimeConnections;
this.primeConnectionsURIPath = primeConnectionsURI;
this.primeRatio = primeRatio;
executorService = new ThreadPoolExecutor(1 /* minimum */,
maxExecutorThreads /* max threads */,
executorThreadTimeout /*
* timeout - same property as create
* timeout
*/, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue