提交 1e87dd07 编写于 作者: K kohsuke

reimplemented pipe for better performance. test now runs in 2.3sec down from 17sec before.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1300 71c3de6d-444a-0410-be80-ed276b4c234a
上级 04b15b9a
......@@ -63,6 +63,10 @@ public class Channel {
/**
* Sends a command to the remote end and executes it there.
*
* <p>
* This is the lowest layer of abstraction in {@link Channel}.
* {@link Command}s are executed on a remote system in the order they are sent.
*/
/*package*/ synchronized void send(Command cmd) throws IOException {
if(closed)
......
......@@ -11,6 +11,16 @@ import java.io.PipedOutputStream;
import java.io.Serializable;
/**
* Pipe for the remote {@link Callable} and the local program to talk to each other.
*
*
*
* <h2>Implementation Note</h2>
* <p>
* For better performance, {@link Pipe} uses lower-level {@link Command} abstraction
* to send data, instead of typed proxy object. This allows the writer to send data
* without blocking until the arrival of the data is confirmed.
*
* @author Kohsuke Kawaguchi
*/
public final class Pipe implements Serializable {
......@@ -38,19 +48,11 @@ public final class Pipe implements Serializable {
private void writeObject(ObjectOutputStream oos) throws IOException {
if(in!=null && out==null) {
// remote will write to local
IStream proxy = Channel.current().export(IStream.class, new IStream() {
PipedOutputStream pos = new PipedOutputStream((PipedInputStream)in);
public void write(byte[] buf) throws IOException {
pos.write(buf);
}
public void close() throws IOException {
pos.close();
}
});
PipedOutputStream pos = new PipedOutputStream((PipedInputStream)in);
int oid = Channel.current().exportedObjects.intern(pos);
oos.writeBoolean(true); // marker
oos.writeObject(proxy);
oos.writeInt(oid);
} else {
// TODO: remote will read from local
throw new UnsupportedOperationException();
......@@ -63,7 +65,8 @@ public final class Pipe implements Serializable {
if(ois.readBoolean()) {
// local will write to remote
final IStream proxy = (IStream)ois.readObject();
final int oid = ois.readInt();
in = null;
out = new BufferedOutputStream(new OutputStream() {
public void write(int b) throws IOException {
......@@ -81,11 +84,11 @@ public final class Pipe implements Serializable {
}
public void write(byte b[]) throws IOException {
proxy.write(b);
channel.send(new Chunk(oid,b));
}
public void close() throws IOException {
proxy.close();
channel.send(new EOF(oid));
}
});
} else {
......@@ -94,9 +97,59 @@ public final class Pipe implements Serializable {
}
}
private static interface IStream {
void write(byte[] buf) throws IOException;
void close() throws IOException;
/**
* {@link Command} for sending bytes.
*/
private static final class Chunk extends Command {
private final int oid;
private final byte[] buf;
public Chunk(int oid, byte[] buf) {
this.oid = oid;
this.buf = buf;
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.exportedObjects.get(oid);
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
}
}
public String toString() {
return "Pipe.Chunk("+oid+","+buf.length+")";
}
private static final long serialVersionUID = 1L;
}
/**
* {@link Command} for sending EOF.
*/
private static final class EOF extends Command {
private final int oid;
public EOF(int oid) {
this.oid = oid;
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.exportedObjects.get(oid);
try {
os.close();
} catch (IOException e) {
// ignore errors
}
}
public String toString() {
return "Pipe.EOF("+oid+")";
}
private static final long serialVersionUID = 1L;
}
private static final long serialVersionUID = 1L;
......
......@@ -34,17 +34,18 @@ final class RemoteInvocationHandler implements InvocationHandler, Serializable {
if(args==null) args = EMPTY_ARRAY;
if(method.getDeclaringClass()==Object.class) {
Class<?> dc = method.getDeclaringClass();
if(dc ==Object.class) {
// handle equals and hashCode by ourselves
try {
return method.invoke(this,args);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
} else {
// delegate the rest of the methods to the remote object
return new RPCRequest(oid,method,args).call(channel);
}
// delegate the rest of the methods to the remote object
return new RPCRequest(oid,method,args).call(channel);
}
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册