提交 16ff3492 编写于 作者: K kohsuke

Working on a fix for pipe clogging problem.

git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@34837 71c3de6d-444a-0410-be80-ed276b4c234a
上级 bfd3c0e1
......@@ -46,6 +46,15 @@ public final class Capability implements Serializable {
return (mask&MASK_MULTI_CLASSLOADER)!=0;
}
/**
* Does the implementation supports window size control over pipes?
*
* @see ProxyOutputStream
*/
public boolean supportsPipeThrottling() {
return (mask& MASK_PIPE_THROTTLING)!=0;
}
/**
* Writes out the capacity preamble.
*/
......@@ -89,6 +98,11 @@ public final class Capability implements Serializable {
*/
private static final long MASK_MULTI_CLASSLOADER = 2L;
/**
* Bit that indicates the use of TCP-like window control for {@link ProxyOutputStream}.
*/
private static final long MASK_PIPE_THROTTLING = 4L;
static final byte[] PREAMBLE;
static {
......
package hudson.remoting;
/**
* @author Kohsuke Kawaguchi
*/
abstract class PipeWindow {
abstract void increase(int delta);
abstract int peek();
/**
* Blocks until some space becomes available.
*/
abstract int get() throws InterruptedException;
abstract void decrease(int delta);
/**
* Fake implementation used when the receiver side doesn't support throttling.
*/
PipeWindow FAKE = new PipeWindow() {
void increase(int delta) {
}
int peek() {
return Integer.MAX_VALUE;
}
int get() throws InterruptedException {
return Integer.MAX_VALUE;
}
void decrease(int delta) {
}
};
static class Real extends PipeWindow {
private int available;
public synchronized void increase(int delta) {
available += delta;
notifyAll();
}
public synchronized int peek() {
return available;
}
/**
* Blocks until some space becomes available.
*/
public synchronized int get() throws InterruptedException {
while (available==0)
wait();
return available;
}
public synchronized void decrease(int delta) {
available -= delta;
if (available<0)
throw new AssertionError();
}
}
}
......@@ -25,6 +25,7 @@ package hudson.remoting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
/**
......@@ -35,6 +36,8 @@ final class ProxyOutputStream extends OutputStream {
private Channel channel;
private int oid;
private PipeWindow window;
/**
* If bytes are written to this stream before it's connected
* to a remote object, bytes will be stored in this buffer.
......@@ -76,10 +79,13 @@ final class ProxyOutputStream extends OutputStream {
this.channel = channel;
this.oid = oid;
window = channel.getPipeWindow(oid);
// if we already have bytes to write, do so now.
if(tmp!=null) {
channel.send(new Chunk(oid,tmp.toByteArray()));
byte[] b = tmp.toByteArray();
tmp = null;
write(b);
}
if(closed) // already marked closed?
doClose();
......@@ -109,7 +115,21 @@ final class ProxyOutputStream extends OutputStream {
tmp = new ByteArrayOutputStream();
tmp.write(b);
} else {
channel.send(new Chunk(oid,b));
int sendable;
try {
sendable = Math.min(window.get(),b.length);
} catch (InterruptedException e) {
throw (IOException)new InterruptedIOException().initCause(e);
}
if (sendable==b.length) {
window.decrease(sendable);
channel.send(new Chunk(oid,b));
} else {
// fill the sender window size now, and send the rest in a separate chunk
write(b,0,sendable);
write(b,sendable,b.length-sendable);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册