提交 666f0b47 编写于 作者: K Kohsuke Kawaguchi

Fixing a bug.

pending was null as soon as task started running, so it was possible that another thread submits the task and have that executed while the first one was still running
上级 00336a1d
package jenkins.util;
import com.google.common.util.concurrent.SettableFuture;
import hudson.remoting.AtmostOneThreadExecutor;
import java.util.concurrent.Callable;
......@@ -41,7 +42,9 @@ public class AtmostOneTaskExecutor<V> {
* If a task is already submitted and pending execution, non-null.
* Guarded by "synchronized(this)"
*/
private Future<V> pending;
private SettableFuture<V> pending;
private SettableFuture<V> inprogress;
public AtmostOneTaskExecutor(ExecutorService base, Callable<V> task) {
this.base = base;
......@@ -57,18 +60,39 @@ public class AtmostOneTaskExecutor<V> {
// if a task is already pending, just join that
return pending;
pending = base.submit(new Callable<V>() {
pending = SettableFuture.create();
if (inprogress==null) {
// if the task isn't currently running, schedule one
run();
}
return pending;
}
private synchronized void run() {
base.submit(new Callable<Void>() {
@Override
public V call() throws Exception {
public Void call() throws Exception {
// before we get going, everyone who submits after this
// should form a next batch
synchronized (AtmostOneTaskExecutor.this) {
inprogress = pending;
pending = null;
}
return task.call();
try {
inprogress.set(task.call());
} catch (Throwable t) {
inprogress.setException(t);
} finally {
synchronized (AtmostOneTaskExecutor.this) {
// if next one is pending, get that scheduled
inprogress = null;
if (pending!=null)
run();
}
}
return null;
}
});
return pending;
}
}
package jenkins.util
import hudson.util.OneShotEvent
import org.junit.Test
import java.util.concurrent.Callable
......@@ -14,29 +15,33 @@ import java.util.concurrent.atomic.AtomicInteger
class AtmostOneTaskExecutorTest {
def counter = new AtomicInteger()
def lock = new Object()
def lock = new OneShotEvent()
@Test
public void doubleBooking() {
synchronized (lock) {
def base = Executors.newCachedThreadPool()
def es = new AtmostOneTaskExecutor(base,
{ ->
counter.incrementAndGet()
synchronized (lock) {
lock.wait()
}
} as Callable);
es.submit()
while (counter.get()==0)
; // spin lock until executor gets to the choking point
def f = es.submit() // this should hang
Thread.sleep(500) // make sure the 2nd task is hanging
assert counter.get()==1
assert !f.isDone()
notifyAll() // let the first one go
}
def f1,f2;
def base = Executors.newCachedThreadPool()
def es = new AtmostOneTaskExecutor(base,
{ ->
counter.incrementAndGet()
lock.block()
} as Callable);
f1 = es.submit()
while (counter.get() == 0)
; // spin lock until executor gets to the choking point
f2 = es.submit() // this should hang
Thread.sleep(500) // make sure the 2nd task is hanging
assert counter.get() == 1
assert !f2.isDone()
lock.signal() // let the first one go
f1.get(); // first one should complete
// now 2nd one gets going and hits the choke point
f2.get()
assert counter.get()==2
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册