提交 2b1d1753 编写于 作者: K kohsuke

implemented asynchronous invocation.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1297 71c3de6d-444a-0410-be80-ed276b4c234a
上级 91879ec8
......@@ -3,8 +3,14 @@ package hudson.remoting;
import java.io.Serializable;
/**
* Represents computation to be done on a remote system.
*
* @author Kohsuke Kawaguchi
*/
public interface Callable<V,T extends Throwable> extends Serializable {
/**
* Performs computation and returns the result,
* or throws some exception.
*/
V call() throws T;
}
......@@ -9,7 +9,9 @@ import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -112,6 +114,25 @@ public class Channel {
}
}
/**
* Makes an asynchronous remote procedure call.
*/
public <V extends Serializable,T extends Throwable>
Future<V> callAsync(final Callable<V,T> callable) throws IOException, T, InterruptedException {
final Future<UserResponse<V>> f = new UserRequest<V, T>(this, callable).callAsync(this);
return new FutureAdapter<V,UserResponse<V>>(f) {
protected V adapt(UserResponse<V> r) throws ExecutionException {
try {
return r.retrieve(Channel.this,callable.getClass().getClassLoader());
} catch (IOException e) {
throw new ExecutionException(e);
} catch (ClassNotFoundException e) {
throw new ExecutionException(e);
}
}
};
}
private synchronized void terminate(IOException e) {
// abort
closed = true;
......
package hudson.remoting;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* {@link Future} that converts the return type.
*
* @author Kohsuke Kawaguchi
*/
abstract class FutureAdapter<X,Y> implements Future<X> {
protected final Future<Y> core;
protected FutureAdapter(Future<Y> core) {
this.core = core;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return core.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return core.isCancelled();
}
public boolean isDone() {
return core.isDone();
}
public X get() throws InterruptedException, ExecutionException {
return adapt(core.get());
}
public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return adapt(core.get(timeout, unit));
}
protected abstract X adapt(Y y) throws ExecutionException;
}
......@@ -4,6 +4,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Request/response pattern over {@link Command}.
......@@ -34,7 +38,7 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
*/
private final int id;
private Response<RSP,EXC> response;
private volatile Response<RSP,EXC> response;
protected Request() {
synchronized(Request.class) {
......@@ -46,9 +50,10 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
* Sends this request to a remote system, and blocks until we receives a response.
*/
public synchronized final RSP call(Channel channel) throws EXC, InterruptedException, IOException {
response=null;
channel.pendingCalls.put(id,this);
channel.send(this);
response=null;
while(response==null)
wait(); // wait until the response arrives
......@@ -58,6 +63,64 @@ abstract class Request<RSP extends Serializable,EXC extends Throwable> extends C
return response.returnValue;
}
/**
* Makes an invocation but immediately returns without waiting for the completion
* (AKA aysnchronous invocation.)
*
* @return
* The {@link Future} object that can be used to wait for the completion.
*/
public final Future<RSP> callAsync(Channel channel) throws EXC, InterruptedException, IOException {
response=null;
channel.pendingCalls.put(id,this);
channel.send(this);
return new Future<RSP>() {
/**
* The task cannot be cancelled.
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return response!=null;
}
public RSP get() throws InterruptedException, ExecutionException {
synchronized(Request.this) {
while(response==null)
Request.this.wait(); // wait until the response arrives
if(response.exception!=null)
throw new ExecutionException(response.exception);
return response.returnValue;
}
}
public RSP get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
synchronized(Request.this) {
if(response==null)
Request.this.wait(unit.toMillis(timeout)); // wait until the response arrives
if(response==null)
throw new TimeoutException();
if(response.exception!=null)
throw new ExecutionException(response.exception);
return response.returnValue;
}
}
};
}
/**
* Called by the {@link Response} when we received it.
*/
......
package hudson.remoting;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
/**
* @author Kohsuke Kawaguchi
*/
......@@ -7,6 +10,13 @@ public class SimpleTest extends RmiTestBase {
public void test1() throws Exception {
int r = channel.call(new Callable1());
System.out.println("result=" + r);
assertEquals(5,r);
}
public void test1Async() throws Exception {
Future<Integer> r = channel.callAsync(new Callable1());
System.out.println("result="+r.get());
assertEquals(5,(int)r.get());
}
private static class Callable1 implements Callable<Integer, RuntimeException> {
......@@ -16,6 +26,7 @@ public class SimpleTest extends RmiTestBase {
}
}
public void test2() throws Exception {
try {
channel.call(new Callable2());
......@@ -25,6 +36,16 @@ public class SimpleTest extends RmiTestBase {
}
}
public void test2Async() throws Exception {
try {
Future<Integer> r = channel.callAsync(new Callable2());
r.get();
fail();
} catch (ExecutionException e) {
assertEquals(e.getCause().getMessage(),"foo");
}
}
private static class Callable2 implements Callable<Integer, RuntimeException> {
public Integer call() throws RuntimeException {
throw new RuntimeException("foo");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册