提交 4afa2a96 编写于 作者: K kohsuke

when a thread that's waiting for a remote operation is interrupted, also...

when a thread that's waiting for a remote operation is interrupted, also interrupt the remote thread that's executing it, so that the actual operation gets cancelled.
(#904)


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@5393 71c3de6d-444a-0410-be80-ed276b4c234a
上级 73ad35a3
......@@ -10,11 +10,13 @@ import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -73,7 +75,7 @@ public class Channel implements VirtualChannel {
private final ObjectInputStream ois;
private final ObjectOutputStream oos;
private final String name;
/*package*/ final Executor executor;
/*package*/ final ExecutorService executor;
/**
* If true, the incoming link is already shut down,
......@@ -88,6 +90,12 @@ public class Channel implements VirtualChannel {
/*package*/ final Map<Integer,Request<?,?>> pendingCalls = new Hashtable<Integer,Request<?,?>>();
/**
* Records the {@link Request}s being executed on this channel, sent by the remote peer.
*/
/*package*/ final Map<Integer,Request<?,?>> executingCalls =
Collections.synchronizedMap(new Hashtable<Integer,Request<?,?>>());
/**
* {@link ClassLoader}s that are proxies of the remote classloaders.
*/
......@@ -104,7 +112,7 @@ public class Channel implements VirtualChannel {
private final Vector<Listener> listeners = new Vector<Listener>();
private int gcCounter;
public Channel(String name, Executor exec, InputStream is, OutputStream os) throws IOException {
public Channel(String name, ExecutorService exec, InputStream is, OutputStream os) throws IOException {
this(name,exec,is,os,null);
}
......@@ -125,7 +133,7 @@ public class Channel implements VirtualChannel {
* when the established communication channel might include some data that might
* be useful for debugging/trouble-shooting.
*/
public Channel(String name, Executor exec, InputStream is, OutputStream os, OutputStream header) throws IOException {
public Channel(String name, ExecutorService exec, InputStream is, OutputStream os, OutputStream header) throws IOException {
this.name = name;
this.executor = exec;
......
......@@ -3,6 +3,7 @@ package hudson.remoting;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
......@@ -19,7 +20,6 @@ import java.util.logging.Logger;
* @see Response
*/
abstract class Request<RSP extends Serializable,EXC extends Throwable> extends Command {
/**
* Executed on a remote system to perform the task.
*
......@@ -43,6 +43,12 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
private volatile Response<RSP,EXC> response;
/**
* While executing the call this is set to the handle of the execution.
*/
private volatile transient Future<?> future;
protected Request() {
synchronized(Request.class) {
id = nextId++;
......@@ -77,8 +83,14 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
}
synchronized(this) {
while(response==null)
wait(); // wait until the response arrives
try {
while(response==null)
wait(); // wait until the response arrives
} catch (InterruptedException e) {
// if we are cancelled, abort the remote computation, too
channel.send(new Cancel(id));
throw e;
}
Object exc = response.exception;
......@@ -100,13 +112,13 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
* @throws IOException
* If there's an error during the communication.
*/
public final Future<RSP> callAsync(Channel channel) throws IOException {
public final hudson.remoting.Future<RSP> callAsync(final Channel channel) throws IOException {
response=null;
channel.pendingCalls.put(id,this);
channel.send(this);
return new Future<RSP>() {
return new hudson.remoting.Future<RSP>() {
/**
* The task cannot be cancelled.
*/
......@@ -124,8 +136,17 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
public RSP get() throws InterruptedException, ExecutionException {
synchronized(Request.this) {
while(response==null)
try {
while(response==null)
Request.this.wait(); // wait until the response arrives
} catch (InterruptedException e) {
try {
channel.send(new Cancel(id));
} catch (IOException e1) {
// couldn't cancel. ignore.
}
throw e;
}
if(response.exception!=null)
throw new ExecutionException(response.exception);
......@@ -170,7 +191,8 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
* Schedules the execution of this request.
*/
protected final void execute(final Channel channel) {
channel.executor.execute(new Runnable() {
channel.executingCalls.put(id,this);
future = channel.executor.submit(new Runnable() {
public void run() {
try {
RSP rsp;
......@@ -187,6 +209,8 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
// communication error.
// this means the caller will block forever
logger.log(Level.SEVERE, "Failed to send back a reply",e);
} finally {
channel.executingCalls.remove(id);
}
}
});
......@@ -214,4 +238,22 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
// throw new Error(e);
// }
//}
/**
* Interrupts the execution of the remote computation.
*/
private static final class Cancel extends Command {
private final int id;
Cancel(int id) {
this.id = id;
}
protected void execute(Channel channel) {
Request<?,?> r = channel.executingCalls.get(id);
if(r==null) return; // already completed
Future<?> f = r.future;
if(f!=null) f.cancel(true);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册