提交 81227a13 编写于 作者: E elandau

Merge pull request #194 from aspyker/fixPrepare

fix 30s hang on priming connections to serverlists of over size 100
......@@ -22,10 +22,10 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -63,17 +63,27 @@ public class PrimeConnections {
public void primeCompleted(Server s, Throwable lastException);
}
static class PrimeConnectionCounters {
final AtomicInteger numServersLeft;
final AtomicInteger numServers;
final AtomicInteger numServersSuccessful;
public PrimeConnectionCounters(int initialSize) {
numServersLeft = new AtomicInteger(initialSize);
numServers = new AtomicInteger(initialSize);
numServersSuccessful = new AtomicInteger(0);
public static class PrimeConnectionEndStats {
public final int total;
public final int success;
public final int failure;
public final long totalTime;
public PrimeConnectionEndStats(int total, int success, int failure, long totalTime) {
this.total = total;
this.success = success;
this.failure = failure;
this.totalTime = totalTime;
}
@Override
public String toString() {
return "PrimeConnectionEndStats [total=" + total + ", success="
+ success + ", failure=" + failure + ", totalTime="
+ totalTime + "]";
}
}
private static final Logger logger = LoggerFactory.getLogger(PrimeConnections.class);
// affordance to change the URI we connect to while "priming"
......@@ -93,11 +103,8 @@ public class PrimeConnections {
private String name = "default";
private int maxTasksPerExecutorQueue = 100;
private float primeRatio = 1.0f;
int maxRetries = 9;
long maxTotalTimeToPrimeConnections = 30 * 1000; // default time
......@@ -112,6 +119,8 @@ public class PrimeConnections {
private IPrimeConnection connector;
private PrimeConnectionEndStats stats;
private PrimeConnections() {
}
......@@ -169,7 +178,7 @@ public class PrimeConnections {
* timeout - same property as create
* timeout
*/, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxTasksPerExecutorQueue)
new LinkedBlockingQueue<Runnable>()
/* Bounded queue with FIFO- bounded to max tasks */,
new ASyncPrimeConnectionsThreadFactory(name) /*
* So we can give
......@@ -224,24 +233,32 @@ public class PrimeConnections {
} finally {
stopWatch.stop();
}
printStats(totalCount, successCount.get(), failureCount.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
stats = new PrimeConnectionEndStats(totalCount, successCount.get(), failureCount.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
printStats(stats);
}
private void printStats(int total, int success, int failure, long totalTime) {
if (total != success) {
public PrimeConnectionEndStats getEndStats() {
return stats;
}
private void printStats(PrimeConnectionEndStats stats) {
if (stats.total != stats.success) {
logger.info("Priming Connections not fully successful");
} else {
logger.info("Priming connections fully successful");
}
logger.debug("numServers left to be 'primed'="
+ (total - success));
logger.debug("numServers successfully 'primed'=" + success);
+ (stats.total - stats.success));
logger.debug("numServers successfully 'primed'=" + stats.success);
logger
.debug("numServers whose attempts not complete exclusively due to max time allocated="
+ (total - (success + failure)));
logger.debug("Total Time Taken=" + totalTime
+ (stats.total - (stats.success + stats.failure)));
logger.debug("Total Time Taken=" + stats.totalTime
+ " msecs, out of an allocated max of (msecs)="
+ maxTotalTimeToPrimeConnections);
logger.debug("stats = " + stats);
}
/*
......@@ -286,8 +303,12 @@ public class PrimeConnections {
try {
ftC = makeConnectionASync(s, listener);
ftList.add(ftC);
} catch (Throwable e) { // NOPMD
}
catch (RejectedExecutionException ree) {
logger.error("executor submit failed", ree);
}
catch (Exception e) {
logger.error("general error", e);
// It does not really matter if there was an exception,
// the goal here is to attempt "priming/opening" the route
// in ec2 .. actual http results do not matter
......@@ -300,7 +321,7 @@ public class PrimeConnections {
}
private Future<Boolean> makeConnectionASync(final Server s,
final PrimeConnectionListener listener) throws InterruptedException, ExecutionException {
final PrimeConnectionListener listener) throws InterruptedException, RejectedExecutionException {
Callable<Boolean> ftConn = new Callable<Boolean>() {
public Boolean call() throws Exception {
logger.debug("calling primeconnections ...");
......
package com.netflix.niws.client.http;
import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.commons.configuration.Configuration;
import org.junit.*;
import com.netflix.client.ClientFactory;
import com.netflix.client.PrimeConnections.PrimeConnectionEndStats;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.ConfigurationManager;
import com.netflix.loadbalancer.AbstractServerList;
import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
import com.netflix.loadbalancer.Server;
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.net.httpserver.HttpServer;
public class PrimeConnectionsTest {
private static String SERVICE_URI;
private static int port = (new Random()).nextInt(1000) + 4000;
private static HttpServer server = null;
private static final int SMALL_FIXED_SERVER_LIST_SIZE = 10;
private static final int LARGE_FIXED_SERVER_LIST_SIZE = 200;
public static class LargeFixedServerList extends FixedServerList {
public LargeFixedServerList() {
super(200);
}
}
public static class SmallFixedServerList extends FixedServerList {
public SmallFixedServerList() {
super(10);
}
}
public static class FixedServerList extends AbstractServerList<Server> {
private Server testServer = new Server("localhost", port);
private List<Server> testServers;
public FixedServerList(int repeatCount) {
Server list[] = new Server[repeatCount];
for (int ii = 0; ii < list.length; ii++) {
list[ii] = testServer;
}
testServers = Arrays.asList(list);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
@Override
public List<Server> getInitialListOfServers() {
return testServers;
}
@Override
public List<Server> getUpdatedListOfServers() {
return testServers;
}
}
@BeforeClass
public static void setup(){
PackagesResourceConfig resourceConfig = new PackagesResourceConfig("com.netflix.niws.client.http");
SERVICE_URI = "http://localhost:" + port + "/";
try{
server = HttpServerFactory.create(SERVICE_URI, resourceConfig);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testPrimeConnectionsSmallPool() throws Exception {
Configuration config = ConfigurationManager.getConfigInstance();
config.setProperty("PrimeConnectionsTest1.ribbon.NFLoadBalancerClassName",
com.netflix.loadbalancer.DynamicServerListLoadBalancer.class.getName());
config.setProperty("PrimeConnectionsTest1.ribbon.NIWSServerListClassName", SmallFixedServerList.class.getName());
config.setProperty("PrimeConnectionsTest1.ribbon.EnablePrimeConnections", "true");
DynamicServerListLoadBalancer<Server> lb = (DynamicServerListLoadBalancer<Server>) ClientFactory.getNamedLoadBalancer("PrimeConnectionsTest1");
PrimeConnectionEndStats stats = lb.getPrimeConnections().getEndStats();
assertEquals(stats.success, SMALL_FIXED_SERVER_LIST_SIZE);
}
@Test
public void testPrimeConnectionsLargePool() throws Exception {
Configuration config = ConfigurationManager.getConfigInstance();
config.setProperty("PrimeConnectionsTest2.ribbon.NFLoadBalancerClassName",
com.netflix.loadbalancer.DynamicServerListLoadBalancer.class.getName());
config.setProperty("PrimeConnectionsTest2.ribbon.NIWSServerListClassName", LargeFixedServerList.class.getName());
config.setProperty("PrimeConnectionsTest2.ribbon.EnablePrimeConnections", "true");
DynamicServerListLoadBalancer<Server> lb = (DynamicServerListLoadBalancer<Server>) ClientFactory.getNamedLoadBalancer("PrimeConnectionsTest2");
PrimeConnectionEndStats stats = lb.getPrimeConnections().getEndStats();
assertEquals(stats.success, LARGE_FIXED_SERVER_LIST_SIZE);
}
@AfterClass
public static void shutDown() {
server.stop(0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册