提交 b55a83ee 编写于 作者: A Allen Wang

Updates for findbugs.

上级 0d983ef5
......@@ -24,6 +24,7 @@ subprojects {
compile 'com.netflix.servo:servo-core:0.4.32'
compile 'com.google.guava:guava:11.0.2'
compile 'com.netflix.archaius:archaius-core:0.5.3'
compile 'com.netflix.netflix-commons:netflix-commons-util:0.1.0'
compile 'javax.ws.rs:jsr311-api:1.1.1'
compile 'commons-collections:commons-collections:3.2.1'
testCompile 'junit:junit:4.10'
......@@ -32,6 +33,9 @@ subprojects {
}
project(':ribbon-core') {
dependencies {
compile 'com.netflix.netflix-commons:netflix-statistics:0.1.0'
}
}
project(':ribbon-httpclient') {
......
package com.netflix.loadbalancer;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Class that simulates a multi-thread user request environment to test
* faked server alive/dead scenario and LB performance
* TODO: Move this to junit test
* @author stonse
*
*/
public class LBMultiThreadedTest {
static int numServers = 10;
static int serverDie = 3; /* out of numServers */
static int serverRevive = 2; /* out of 10000 */
static int megadeathEvent = 1; /* out of 10000 */
static int numRequestsPerSimulUser = 100;
static int numSimulUser = 100;
static int numRequests = numRequestsPerSimulUser * numSimulUser;
static final int oddDenom = 10;
static Server[] allServers = new Server[numServers];
static HashMap<String, Boolean> isAliveMap = new HashMap<String, Boolean>();
static ServerComparator serverComparator = new ServerComparator();
public static void main(String[] argv) {
for (int i = 0; i < numServers; i++) {
allServers[i] = new Server("" + i, 80);
isAliveMap.put(allServers[i].getId(), new Boolean(true));
}
Thread upDownThread = new UpDownThread();
upDownThread.setDaemon(true);
upDownThread.start();
NFLoadBalancer lb = new NFLoadBalancer();
lb.setPingInterval(20);
lb.setMaxTotalPingTime(5);
lb.setPing(new PingFake());
//lb.setRule(new RetryRule(new CustomerHashRule(), 200));
//lb.setRule(new RoundRobinRule());
lb.setRule(new RetryRule(new RoundRobinRule(), 200));
lb.addServers(allServers);
System.out.println("Zzzz");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
lb.forceQuickPing();
AtomicInteger[] timesChosen = new AtomicInteger[numServers];
AtomicInteger failedCount = new AtomicInteger(0);
AtomicInteger threadsDone = new AtomicInteger(0);
AtomicInteger totalUsers = new AtomicInteger(0);
AtomicInteger totalSwitches = new AtomicInteger(0);
for (int i = 0; i < numServers; i++) {
timesChosen[i] = new AtomicInteger(0);
}
Thread[] userThreads = new Thread[numSimulUser];
for (int i = 0; i < numSimulUser; i++) {
userThreads[i] = new UserThread(lb, numRequestsPerSimulUser,
threadsDone, failedCount, timesChosen, totalUsers,
totalSwitches);
}
System.out.println("Starting user threads");
long startTime = System.currentTimeMillis();
for (Thread userThread : userThreads) {
userThread.start();
}
while (threadsDone.get() < numSimulUser) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
double avt = (elapsedTime * 1.0) / numRequests;
System.out.println("\nFinished " + numRequests + " requests -- "
+ elapsedTime + " ms (avg " + avt + ")\n");
System.out.println("-------------------");
System.out.println("Simul users: " + numSimulUser);
System.out.println("Total users: " + totalUsers);
System.out.println("Switch/Fail: " + totalSwitches);
System.out.println("SF/US " + (totalSwitches.intValue())
/ (1.0 * totalUsers.intValue()));
System.out.println("Total failures: " + failedCount.intValue());
for (int i = 0; i < numServers; i++) {
System.out.println("fakeserve" + i + "\t"
+ timesChosen[i].intValue());
}
}
static class UpDownThread extends Thread {
Random rand = new Random();
public void run() {
while (true) {
if (allServers != null) {
if (rand.nextInt(oddDenom) <= megadeathEvent) {
/* it's the end of the world as we know it */
for (Server server : allServers) {
isAliveMap.put(server.getId(), new Boolean(false));
}
} else {
for (Server server : allServers) {
String hostPortServlet = server.getId();
Boolean currentStatus = isAliveMap
.get(hostPortServlet);
Boolean newStatus = null;
if (currentStatus == null) {
currentStatus = new Boolean(true);
}
if (currentStatus) {
newStatus = new Boolean(
rand.nextInt(oddDenom) >= serverDie);
} else {
newStatus = new Boolean(
rand.nextInt(oddDenom) < serverRevive);
}
isAliveMap.put(hostPortServlet, newStatus);
}
}
} else {
Thread.yield();
}
}
}
}
/**
* Simulates a User making a Request
* @author stonse
*
*/
static class UserThread extends Thread {
int numRequests = 0;
NFLoadBalancer lb;
Random rand = new Random();
int requestIdx = 0;
AtomicInteger threadsDone;
AtomicInteger failedCount;
AtomicInteger totalUsers;
AtomicInteger totalSwitches;
AtomicInteger[] timesChosen;
byte[] fakeCustomerBytes = new byte[4];
IRule ir = null;
public UserThread(NFLoadBalancer lb, int numRequests,
AtomicInteger threadsDone, AtomicInteger failedCount,
AtomicInteger[] timesChosen, AtomicInteger totalUsers,
AtomicInteger totalSwitches) {
this.lb = lb;
this.numRequests = numRequests;
this.failedCount = failedCount;
this.timesChosen = timesChosen;
this.threadsDone = threadsDone;
this.totalUsers = totalUsers;
this.totalSwitches = totalSwitches;
ir = lb.getRule();
}
public void run() {
String fakeCustomer = null;
int requestsLeft = 0;
Server prevChoice = null;
boolean newUser = false;
for (int i = 0; i < numRequests; i++) {
if (fakeCustomer == null) {
requestIdx += rand.nextInt();
fakeCustomerBytes[0] = (byte) (requestIdx % 256);
fakeCustomerBytes[1] = (byte) ((requestIdx >> 8) % 256);
fakeCustomerBytes[2] = (byte) ((requestIdx >> 16) % 256);
fakeCustomerBytes[3] = (byte) ((requestIdx >> 24) % 256);
fakeCustomer = new String(fakeCustomerBytes);
/* variable number of requests per "user" */
requestsLeft = (rand.nextInt(3) + 1)
* (rand.nextInt(3) + 1) * (rand.nextInt(10) + 1);
totalUsers.getAndIncrement();
prevChoice = null;
newUser = true;
}
boolean failed = false;
boolean switched = false;
Server choice = lb.chooseServer(fakeCustomer);
if (!newUser) {
if (prevChoice == null) {
switched = true;
} else if ((choice != null)
&& (serverComparator.compare(choice, prevChoice) != 0)) {
totalSwitches.getAndIncrement();
switched = true;
}
}
newUser = false;
prevChoice = choice;
if (--requestsLeft == 0) {
/**
* Create a StikyRule and then uncomment this ... if (ir
* instanceof StickyRule) { ((StickyRule)
* ir).clearAssignment(fakeCustomer); }
**/
fakeCustomer = null;
}
if ((choice == null) || (!choice.isAlive())
|| (!isAliveMap.get(choice.getId()).booleanValue())) {
failed = true;
if (choice != null) {
/* notify load balancer */
lb.markServerDown(choice);
}
} else {
int hostNum = Integer.parseInt(choice.getHost());
timesChosen[hostNum].getAndIncrement();
}
if (failed) {
failedCount.getAndIncrement();
}
if (switched || failed) {
totalSwitches.getAndIncrement();
}
}
threadsDone.getAndIncrement();
}
}
static class PingFake implements IPing {
public boolean isAlive(Server server) {
Boolean res = isAliveMap.get(server.getId());
return ((res != null) && (res.booleanValue()));
}
}
}
......@@ -298,7 +298,7 @@ public class NFLoadBalancer extends AbstractLoadBalancer implements PrimeConnect
setupPingTask(); // since ping data changed
}
} else{
this.ping = ping;
this.ping = null;
//cancel the timer task
lbTimer.cancel();
}
......
package com.netflix.loadbalancer;
import java.util.concurrent.atomic.AtomicInteger;
public class PingThread extends Thread {
int index;
Object[] allServers;
boolean[] results;
AtomicInteger resultCounter;
IPing ping;
public PingThread() {
}
public PingThread(int index, Object[] allServers,
boolean[] results, AtomicInteger resultCounter,
IPing ping) {
this.index = index;
this.allServers = allServers;
this.results = results;
this.resultCounter = resultCounter;
this.ping = ping;
}
public void run() {
try {
results[index] = ping.isAlive((Server) allServers[index]);
} catch (Throwable t) {
results[index] = false;
} finally {
resultCounter.incrementAndGet();
}
}
}
......@@ -12,6 +12,7 @@ public class RandomRule implements IRule {
/*
* Randomly choose from all living servers
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
public Server choose(NFLoadBalancer lb, Object key) {
if (lb == null) {
return null;
......
......@@ -106,14 +106,12 @@ public class ResponseTimeWeightedRule implements IRule {
final static boolean availableOnly = false;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
public Server choose(NFLoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
if (lb == null) {
return null;
}
while (server == null) {
if (Thread.interrupted()) {
......
package com.netflix.loadbalancer;
import java.io.Serializable;
import java.util.Comparator;
public class ServerComparator implements Comparator<Server> {
public int compare(Server s1, Server s2) {
public class ServerComparator implements Comparator<Server>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
public int compare(Server s1, Server s2) {
return s1.getHostPort().compareTo(s2.getId());
}
}
......@@ -54,8 +54,6 @@ public class ServerStats {
private AtomicLong totalCircuitBreakerBlackOutPeriod = new AtomicLong(0);
private volatile long lastAccessedTimestamp;
private volatile long firstConnectionTimestamp = 0;
private String serverId;
public ServerStats() {
connectionFailureThreshold = DynamicPropertyFactory.getInstance().getIntProperty(
......@@ -85,19 +83,6 @@ public class ServerStats {
publisher.start();
}
this.server = server;
this.serverId = server.getId();
// Do not register as monitor object since it will add hundreds of epic metric
// for RestClient which talks to a cluster with ~100 instances
// Also, metric on each individual server does not help too much
/*
try{
MonitorRegistry.getInstance().registerObject(this);
}catch(Throwable t){
// likely that the server was previously registered
// and there is no easy way to find out if it already exists
// so just gulping this down
}
*/
}
private int getBufferSize() {
......
......@@ -133,10 +133,9 @@ public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T
tracer = Monitors.newTimer(this.getClass().getName() + "_ExecutionTimer", TimeUnit.MILLISECONDS);
}
do {
Stopwatch w = null;
noteOpenConnection(serverStats, request);
Stopwatch w = tracer.start();
try {
noteOpenConnection(serverStats, request);
w = tracer.start();
response = execute(request);
done = true;
} catch (Exception e) {
......@@ -382,8 +381,7 @@ public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T
protected Pair<String, Integer> deriveHostAndPortFromVipAddress(String vipAddress)
throws URISyntaxException, NIWSClientException {
Pair<String, Integer> hostAndPort = new Pair<String, Integer>("",
new Integer(getDefaultPort()));
Pair<String, Integer> hostAndPort = new Pair<String, Integer>(null, -1);
URI uri = new URI(vipAddress);
String scheme = uri.getScheme();
if (scheme == null) {
......
......@@ -30,9 +30,9 @@ public class ClientFactory {
try {
String clientClassName = (String) niwsClientConfig.getProperty(NiwsClientConfigKey.ClientClassName);
client = (AbstractLoadBalancerAwareClient<?, ?>) instantiateNiwsConfigAwareClassInstance(clientClassName, niwsClientConfig);
boolean initializeNFLoadBalancer = Boolean.valueOf("" + niwsClientConfig.getProperty(
boolean initializeNFLoadBalancer = Boolean.parseBoolean(niwsClientConfig.getProperty(
NiwsClientConfigKey.InitializeNFLoadBalancer,
Boolean.valueOf(NiwsClientConfig.DEFAULT_ENABLE_LOADBALANCER)));
NiwsClientConfig.DEFAULT_ENABLE_LOADBALANCER).toString());
if (initializeNFLoadBalancer) {
loadBalancer = (AbstractLoadBalancer) getNamedLoadBalancer(restClientName);
}
......
......@@ -50,7 +50,7 @@ public class ClientRequest implements Cloneable {
}
public boolean isRetriable() {
return (isRetriable == Boolean.TRUE);
return (Boolean.TRUE.equals(isRetriable));
}
protected final ClientRequest setRetriable(boolean isRetriable) {
......
......@@ -90,9 +90,9 @@ public class NiwsClientConfig {
public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.niws.client.DiscoveryEnabledNIWSServerList";
public static int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30*1000; // every half minute (30 secs)
public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30*1000; // every half minute (30 secs)
public static int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30*1000; // all connections idle for 30 secs
public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30*1000; // all connections idle for 30 secs
......@@ -253,6 +253,9 @@ public class NiwsClientConfig {
@Override
public boolean equals(Object other){
if (other == null) {
return false;
}
if (getClass() == other.getClass()) {
return toString().equals(other.toString());
}
......@@ -456,6 +459,7 @@ public class NiwsClientConfig {
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DC_DOUBLECHECK")
private VipAddressResolver getVipAddressResolver() {
if (resolver == null) {
synchronized (this) {
......@@ -668,8 +672,11 @@ public class NiwsClientConfig {
Map<String, HttpVerbUriRegexPropertyValue> aliasMap = configMapForPrefix.get(getClientName());
if (aliasMap == null) {
aliasMap = new ConcurrentHashMap<String, HttpVerbUriRegexPropertyValue>();
configMapForPrefix.put(getClientName(),
Map<String, HttpVerbUriRegexPropertyValue> prev = configMapForPrefix.putIfAbsent(getClientName(),
aliasMap);
if (prev != null) {
aliasMap = prev;
}
}
aliasMap.put(alias.trim(),
HttpVerbUriRegexPropertyValue
......@@ -746,8 +753,11 @@ public class NiwsClientConfig {
if (aliasRuleMapForClient == null) {
// no map exists so far, create one
aliasRuleMapForClient = new ConcurrentHashMap<String, HttpVerbUriRegexPropertyValue>();
dynamicConfigMap.get(prefix).put(clientName,
Map<String, HttpVerbUriRegexPropertyValue> prev = dynamicConfigMap.get(prefix).putIfAbsent(clientName,
aliasRuleMapForClient);
if (prev != null) {
aliasRuleMapForClient = prev;
}
}
String alias = name.substring(configPrefix.length());
......
......@@ -359,7 +359,7 @@ public class PrimeConnections {
logger.debug("Sleeping for " + sleep + "ms ...");
Thread.sleep(sleep); // making this seconds based is too slow
// i.e. 200ms, 400 ms, 800ms, 1600ms etc.
} catch (Exception ex) {
} catch (InterruptedException ex) {
}
}
......
......@@ -49,9 +49,9 @@ public class ZoneAwareNIWSDiscoveryLoadBalancer<T extends Server> extends NIWSDi
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
for (String zone: zoneServersMap.keySet()) {
zone = zone.toLowerCase();
getLoadBalancer(zone).setServersList(zoneServersMap.get(zone));
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
}
......@@ -116,11 +116,13 @@ public class ZoneAwareNIWSDiscoveryLoadBalancer<T extends Server> extends NIWSDi
NFLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
loadBalancer = new NFLoadBalancer(this.getName() + "_" + zone, this.getRule(), this.getLoadBalancerStats());
balancers.put(zone, loadBalancer);
return balancers.get(zone);
} else {
return loadBalancer;
}
NFLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
/**
......@@ -177,8 +179,9 @@ public class ZoneAwareNIWSDiscoveryLoadBalancer<T extends Server> extends NIWSDi
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
for (String zone: snapshot.keySet()) {
ZoneSnapshot zoneSnapshot = snapshot.get(zone);
for (Map.Entry<String, ZoneSnapshot> entry: snapshot.entrySet()) {
String zone = entry.getKey();
ZoneSnapshot zoneSnapshot = entry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册