提交 7ab81687 编写于 作者: S Stephen Connolly

Merge pull request #1785 from stephenc/pr-1777-is-better-when-lockfree

[FIXES PR#1777] Make accessing pendingLaunches lock free
......@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
......@@ -122,8 +123,8 @@ public class NodeProvisioner {
*/
private final Label label;
@GuardedBy("provisioningLock")
private final List<PlannedNode> pendingLaunches = new ArrayList<PlannedNode>();
private final AtomicReference<List<PlannedNode>> pendingLaunches
= new AtomicReference<List<PlannedNode>>(new ArrayList<PlannedNode>());
private final Lock provisioningLock = new ReentrantLock();
......@@ -155,12 +156,7 @@ public class NodeProvisioner {
* @since 1.401
*/
public List<PlannedNode> getPendingLaunches() {
provisioningLock.lock();
try {
return new ArrayList<PlannedNode>(pendingLaunches);
} finally {
provisioningLock.unlock();
}
return new ArrayList<PlannedNode>(pendingLaunches.get());
}
/**
......@@ -214,7 +210,8 @@ public class NodeProvisioner {
int plannedCapacitySnapshot = 0;
for (Iterator<PlannedNode> itr = pendingLaunches.iterator(); itr.hasNext(); ) {
List<PlannedNode> snapPendingLaunches = new ArrayList<PlannedNode>(pendingLaunches.get());
for (Iterator<PlannedNode> itr = snapPendingLaunches.iterator(); itr.hasNext(); ) {
PlannedNode f = itr.next();
if (f.future.isDone()) {
try {
......@@ -249,7 +246,29 @@ public class NodeProvisioner {
LOGGER.log(Level.SEVERE, "Unexpected uncaught exception encountered while "
+ "processing provisioned slave " + f.displayName, e);
} finally {
itr.remove();
while (true) {
List<PlannedNode> orig = pendingLaunches.get();
List<PlannedNode> repl = new ArrayList<PlannedNode>(orig);
// the contract for List.remove(o) is that the first element i where
// (o==null ? get(i)==null : o.equals(get(i)))
// is true will be removed from the list
// since PlannedNode.equals(o) is not final and we cannot assume
// that subclasses do not override with an equals which does not
// assure object identity comparison, we need to manually
// do the removal based on instance identity not equality
boolean changed = false;
for (Iterator<PlannedNode> iterator = repl.iterator(); iterator.hasNext(); ) {
PlannedNode p = iterator.next();
if (p == f) {
iterator.remove();
changed = true;
break;
}
}
if (!changed || pendingLaunches.compareAndSet(orig, repl)) {
break;
}
}
f.spent();
}
} else {
......@@ -355,7 +374,6 @@ public class NodeProvisioner {
* The current statistics snapshot for this {@link #label}.
*/
private final LoadStatistics.LoadStatisticsSnapshot snapshot;
private final List<PlannedNode> pendingLaunches;
/**
* The additional planned capacity for this {@link #label} and provisioned by previous strategies during the
* current updating of the {@link NodeProvisioner}.
......@@ -372,7 +390,6 @@ public class NodeProvisioner {
this.snapshot = snapshot;
this.label = label;
this.plannedCapacitySnapshot = plannedCapacitySnapshot;
pendingLaunches = NodeProvisioner.this.pendingLaunches;
}
/**
......@@ -428,13 +445,8 @@ public class NodeProvisioner {
* The additional planned capacity for this {@link #getLabel()} and provisioned by previous strategies during
* the current updating of the {@link NodeProvisioner}.
*/
public int getAdditionalPlannedCapacity() {
provisioningLock.lock();
try {
return additionalPlannedCapacity;
} finally {
provisioningLock.unlock();
}
public synchronized int getAdditionalPlannedCapacity() {
return additionalPlannedCapacity;
}
/**
......@@ -544,7 +556,7 @@ public class NodeProvisioner {
additionalPlannedCapacity += node.getNumExecutors();
}
} catch (InterruptedException e) {
// ignore, this will be caught by others later
// should never happen as we were told the future was done
} catch (ExecutionException e) {
// ignore, this will be caught by others later
}
......@@ -552,14 +564,18 @@ public class NodeProvisioner {
additionalPlannedCapacity += f.numExecutors;
}
}
provisioningLock.lock();
try {
pendingLaunches.addAll(plannedNodes);
if (additionalPlannedCapacity > 0) {
this.additionalPlannedCapacity += additionalPlannedCapacity;
while (!plannedNodes.isEmpty()) {
List<PlannedNode> orig = pendingLaunches.get();
List<PlannedNode> repl = new ArrayList<PlannedNode>(orig);
repl.addAll(plannedNodes);
if (pendingLaunches.compareAndSet(orig, repl)) {
if (additionalPlannedCapacity > 0) {
synchronized (this) {
this.additionalPlannedCapacity += additionalPlannedCapacity;
}
}
break;
}
} finally {
provisioningLock.unlock();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册