提交 9222fdbc 编写于 作者: K kohsuke

completed pipe implementation.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1303 71c3de6d-444a-0410-be80-ed276b4c234a
上级 8487176c
......@@ -41,7 +41,7 @@ public class Channel {
/**
* Objects exported via {@link #export(Class, Object)}.
*/
/*package*/ final ExportTable<Object> exportedObjects = new ExportTable<Object>();
private final ExportTable<Object> exportedObjects = new ExportTable<Object>();
/**
*
......@@ -97,13 +97,29 @@ public class Channel {
return null;
// TODO: unexport
final int id = exportedObjects.intern(instance);
final int id = export(instance);
return type.cast(Proxy.newProxyInstance( type.getClassLoader(), new Class[]{type},
new RemoteInvocationHandler(id)));
}
/*package*/ int export(Object instance) {
return exportedObjects.intern(instance);
}
/*package*/ Object getExportedObject(int oid) {
return exportedObjects.get(oid);
}
/**
* Makes a remote procedure call.
*
* <p>
* Sends {@link Callable} to the remote system, executes it, and returns its result.
*
* @throws InterruptedException
* If the current thread is interrupted while waiting for the completion.
* @throws IOException
* If there's any error in the communication between {@link Channel}s.
*/
public <V extends Serializable,T extends Throwable>
V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
......@@ -120,6 +136,10 @@ public class Channel {
/**
* Makes an asynchronous remote procedure call.
*
* <p>
* Similar to {@link #call(Callable)} but returns immediately.
* The result of the {@link Callable} can be obtained through the {@link Future} object.
*/
public <V extends Serializable,T extends Throwable>
Future<V> callAsync(final Callable<V,T> callable) throws IOException, T, InterruptedException {
......@@ -149,7 +169,9 @@ public class Channel {
}
/**
* Waits for the close down of this {@link Channel}.
* Waits for this {@link Channel} to be closed down.
*
* The close-down of a {@link Channel} might be initiated locally or remotely.
*/
public synchronized void join() throws InterruptedException {
while(!closed)
......
package hudson.remoting;
import java.util.concurrent.Executors;
/**
* Entry point for running a {@link Channel} that uses stdin/stdout.
*
* This can be used as the main class for launching a channel on
* a separate JVM.
*
* @author Kohsuke Kawaguchi
*/
public class Launcher {
public static void main(String[] args) throws Exception {
Channel channel = new Channel("channel", Executors.newCachedThreadPool(), System.in, System.out);
channel.join();
System.exit(0);
}
}
......@@ -9,10 +9,32 @@ import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.Serializable;
import java.io.ByteArrayOutputStream;
import java.util.logging.Logger;
/**
* Pipe for the remote {@link Callable} and the local program to talk to each other.
*
* <p>
* There are two kinds of pipes. One is for having a local system write to a remote system,
* and the other is for having a remote system write to a local system. Use
* the different versions of the <tt>create</tt> method to create the appropriate kind
* of pipes.
*
* <p>
* Once created, {@link Pipe} can be sent to the remote system as a part of
* {@link Callable} or any other object transportation mechanism between {@link Channel}s.
* Once re-instanciated on the remote {@link Channel}, pipe connects
* back to the local instance and perform necessary set up.
*
* <p>
* The local and remote system can then call {@link #getIn()} and {@link #getOut()} to
* read/write bytes.
*
* <p>
* Pipe can be only written by one system and read by one system. It is an error to
* send one {@link Pipe} to two remote {@link Channel}s, or send one {@link Pipe} to
* the same {@link Channel} twice.
*
*
* <h2>Implementation Note</h2>
......@@ -21,6 +43,8 @@ import java.io.Serializable;
* to send data, instead of typed proxy object. This allows the writer to send data
* without blocking until the arrival of the data is confirmed.
*
* TODO: unexport
*
* @author Kohsuke Kawaguchi
*/
public final class Pipe implements Serializable {
......@@ -32,30 +56,49 @@ public final class Pipe implements Serializable {
this.out = out;
}
/**
* Gets the reading end of the pipe.
*/
public InputStream getIn() {
return in;
}
/**
* Gets the writing end of the pipe.
*/
public OutputStream getOut() {
return out;
}
public static Pipe create() {
/**
* Creates a {@link Pipe} that allows remote system to write and local system to read.
*/
public static Pipe createRemoteToLocal() {
// OutputStream will be created on the target
return new Pipe(new PipedInputStream(),null);
}
/**
* Creates a {@link Pipe} that allows local system to write and remote system to read.
*/
public static Pipe createLocalToRemote() {
return new Pipe(null,new RemoteOutputStream());
}
private void writeObject(ObjectOutputStream oos) throws IOException {
if(in!=null && out==null) {
// remote will write to local
PipedOutputStream pos = new PipedOutputStream((PipedInputStream)in);
int oid = Channel.current().exportedObjects.intern(pos);
int oid = Channel.current().export(pos);
oos.writeBoolean(true); // marker
oos.writeInt(oid);
} else {
// TODO: remote will read from local
throw new UnsupportedOperationException();
// remote will read from local
int oid = Channel.current().export(out);
oos.writeBoolean(false);
oos.writeInt(oid);
}
}
......@@ -65,92 +108,176 @@ public final class Pipe implements Serializable {
if(ois.readBoolean()) {
// local will write to remote
final int oid = ois.readInt();
in = null;
out = new BufferedOutputStream(new OutputStream() {
public void write(int b) throws IOException {
write(new byte[]{(byte)b},0,1);
}
out = new BufferedOutputStream(new RemoteOutputStream(channel, ois.readInt()));
} else {
// local will read from remote.
// tell the remote system about this local read pipe
public void write(byte b[], int off, int len) throws IOException {
if(off==0 && len==b.length)
write(b);
else {
byte[] buf = new byte[len];
System.arraycopy(b,off,buf,0,len);
write(buf);
}
}
// this is the OutputStream that wants to send data to us
final int oidRos = ois.readInt();
public void write(byte b[]) throws IOException {
channel.send(new Chunk(oid,b));
}
// we want 'oidRos' to send data to this PipedOutputStream
PipedOutputStream pos = new PipedOutputStream();
final int oidPos = channel.export(pos);
public void close() throws IOException {
channel.send(new EOF(oid));
}
});
} else {
// TODO
throw new UnsupportedOperationException();
// tell 'ros' to connect to our 'pos'.
channel.send(new ConnectCommand(oidRos, oidPos));
out = null;
in = new PipedInputStream(pos);
}
}
private static final long serialVersionUID = 1L;
/**
* {@link Command} for sending bytes.
* {@link OutputStream} that sends bits to a remote object.
*/
private static final class Chunk extends Command {
private final int oid;
private final byte[] buf;
private static class RemoteOutputStream extends OutputStream {
private Channel channel;
private int oid;
/**
* If bytes are written to this stream before it's connected
* to a remote object, bytes will be stored in this buffer.
*/
private ByteArrayOutputStream tmp;
/**
* Set to true if the stream is closed before it's connected
* to a remote object.
*/
private boolean closed;
public Chunk(int oid, byte[] buf) {
public RemoteOutputStream() {
}
public RemoteOutputStream(Channel channel, int oid) throws IOException {
connect(channel,oid);
}
/**
* Connects this stream to the specified remote object.
*/
private synchronized void connect(Channel channel, int oid) throws IOException {
if(this.channel!=null)
throw new IllegalStateException("Cannot connect twice");
this.channel = channel;
this.oid = oid;
this.buf = buf;
// if we already have bytes to write, do so now.
if(tmp!=null) {
write(tmp.toByteArray());
tmp = null;
}
if(closed)
close();
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.exportedObjects.get(oid);
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
public void write(int b) throws IOException {
write(new byte[]{(byte)b},0,1);
}
public void write(byte b[], int off, int len) throws IOException {
if(off==0 && len==b.length)
write(b);
else {
byte[] buf = new byte[len];
System.arraycopy(b,off,buf,0,len);
write(buf);
}
}
public String toString() {
return "Pipe.Chunk("+oid+","+buf.length+")";
public synchronized void write(byte b[]) throws IOException {
if(channel==null) {
if(tmp==null)
tmp = new ByteArrayOutputStream();
tmp.write(b);
} else {
channel.send(new Chunk(oid,b));
}
}
private static final long serialVersionUID = 1L;
}
public synchronized void close() throws IOException {
if(channel==null)
closed = true;
else
channel.send(new EOF(oid));
}
/**
* {@link Command} for sending EOF.
*/
private static final class EOF extends Command {
private final int oid;
/**
* {@link Command} for sending bytes.
*/
private static final class Chunk extends Command {
private final int oid;
private final byte[] buf;
public EOF(int oid) {
this.oid = oid;
public Chunk(int oid, byte[] buf) {
this.oid = oid;
this.buf = buf;
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.getExportedObject(oid);
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
}
}
public String toString() {
return "Pipe.Chunk("+oid+","+buf.length+")";
}
private static final long serialVersionUID = 1L;
}
/**
* {@link Command} for sending EOF.
*/
private static final class EOF extends Command {
private final int oid;
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.exportedObjects.get(oid);
try {
os.close();
} catch (IOException e) {
// ignore errors
public EOF(int oid) {
this.oid = oid;
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.getExportedObject(oid);
try {
os.close();
} catch (IOException e) {
// ignore errors
}
}
public String toString() {
return "Pipe.EOF("+oid+")";
}
private static final long serialVersionUID = 1L;
}
}
public String toString() {
return "Pipe.EOF("+oid+")";
private static final Logger logger = Logger.getLogger(Pipe.class.getName());
private static class ConnectCommand extends Command {
private final int oidRos;
private final int oidPos;
public ConnectCommand(int oidRos, int oidPos) {
this.oidRos = oidRos;
this.oidPos = oidPos;
}
private static final long serialVersionUID = 1L;
protected void execute(Channel channel) {
try {
RemoteOutputStream ros = (RemoteOutputStream) channel.getExportedObject(oidRos);
ros.connect(channel, oidPos);
} catch (IOException e) {
logger.severe("Failed to connect to pipe");
}
}
}
private static final long serialVersionUID = 1L;
}
......@@ -62,10 +62,8 @@ final class RemoteInvocationHandler implements InvocationHandler, Serializable {
RemoteInvocationHandler that = (RemoteInvocationHandler) o;
if (oid != that.oid) return false;
if (channel!=that.channel) return false;
return this.oid==that.oid && this.channel==that.channel;
return true;
}
public int hashCode() {
......@@ -104,7 +102,7 @@ final class RemoteInvocationHandler implements InvocationHandler, Serializable {
}
protected Serializable perform(Channel channel) throws Throwable {
Object o = channel.exportedObjects.get(oid);
Object o = channel.getExportedObject(oid);
if(o==null)
throw new IllegalStateException("Unable to call "+methodName+". Invalid object ID "+oid);
try {
......
......@@ -14,14 +14,10 @@ public class PipeTest extends RmiTestBase {
* Test the "remote-write local-read" pipe.
*/
public void testRemoteWrite() throws Exception {
Pipe p = Pipe.create();
Pipe p = Pipe.createRemoteToLocal();
Future<Integer> f = channel.callAsync(new WritingCallable(p));
InputStream in = p.getIn();
for( int cnt=0; cnt<256*256; cnt++ )
assertEquals(cnt/256,in.read());
assertEquals(-1,in.read());
in.close();
read(p);
int r = f.get();
System.out.println("result=" + r);
......@@ -36,15 +32,66 @@ public class PipeTest extends RmiTestBase {
}
public Integer call() throws IOException {
OutputStream os = pipe.getOut();
byte[] buf = new byte[384];
for( int i=0; i<256; i++ ) {
Arrays.fill(buf,(byte)i);
os.write(buf,0,256);
}
os.close();
write(pipe);
return 5;
}
}
/**
* Test the "local-write remote-read" pipe.
*/
public void testLocalWrite() throws Exception {
Pipe p = Pipe.createLocalToRemote();
Future<Integer> f = channel.callAsync(new ReadingCallable(p));
write(p);
int r = f.get();
System.out.println("result=" + r);
assertEquals(5,r);
}
public void testLocalWrite2() throws Exception {
Pipe p = Pipe.createLocalToRemote();
Future<Integer> f = channel.callAsync(new ReadingCallable(p));
Thread.sleep(2000); // wait for remote to connect to local.
write(p);
int r = f.get();
System.out.println("result=" + r);
assertEquals(5,r);
}
private static class ReadingCallable implements Callable<Integer, IOException> {
private final Pipe pipe;
public ReadingCallable(Pipe pipe) {
this.pipe = pipe;
}
public Integer call() throws IOException {
read(pipe);
return 5;
}
}
private static void write(Pipe pipe) throws IOException {
OutputStream os = pipe.getOut();
byte[] buf = new byte[384];
for( int i=0; i<256; i++ ) {
Arrays.fill(buf,(byte)i);
os.write(buf,0,256);
}
os.close();
}
private static void read(Pipe p) throws IOException {
InputStream in = p.getIn();
for( int cnt=0; cnt<256*256; cnt++ )
assertEquals(cnt/256,in.read());
assertEquals(-1,in.read());
in.close();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册