提交 5f51597a 编写于 作者: K kohsuke

[FIXED HUDSON-5977] implemented a windowing mechanism to throttle the pipe...

[FIXED HUDSON-5977] implemented a windowing mechanism to throttle the pipe usage, and the pipe operation no longer interferes with the main channel reader thread.

git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@34838 71c3de6d-444a-0410-be80-ed276b4c234a
上级 16ff3492
......@@ -33,7 +33,7 @@ public final class Capability implements Serializable {
}
Capability() {
this(MASK_MULTI_CLASSLOADER);
this(MASK_MULTI_CLASSLOADER|MASK_PIPE_THROTTLING);
}
/**
......@@ -105,6 +105,8 @@ public final class Capability implements Serializable {
static final byte[] PREAMBLE;
public static final Capability NONE = new Capability(0);
static {
try {
PREAMBLE = "<===[HUDSON REMOTING CAPACITY]===>".getBytes("UTF-8");
......
......@@ -24,6 +24,7 @@
package hudson.remoting;
import hudson.remoting.ExportTable.ExportList;
import hudson.remoting.PipeWindow.Real;
import hudson.remoting.forward.ListeningPort;
import hudson.remoting.forward.ForwarderFactory;
import hudson.remoting.forward.PortForwarder;
......@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.Vector;
import java.util.WeakHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -140,6 +142,16 @@ public class Channel implements VirtualChannel, IChannel {
*/
private final ExportTable<Object> exportedObjects = new ExportTable<Object>();
/**
* {@link PipeWindow}s keyed by their OIDs. Weak map, to simplify the GC of the instances.
* Strong references are kept from {@link ProxyOutputStream}.
*
* I haven't thought carefully, and it might be possible for us to lose the tracking of the window size
* in this way, but that will only result in a temporary spike in the effective window size,
* and therefore should be OK.
*/
private final WeakHashMap<Integer,PipeWindow> pipeWindows = new WeakHashMap<Integer, PipeWindow>();
/**
* Registered listeners.
*/
......@@ -208,6 +220,8 @@ public class Channel implements VirtualChannel, IChannel {
*/
private volatile long lastHeard;
/*package*/ final ExecutorService pipeWriter;
/**
* Communication mode.
* @since 1.161
......@@ -306,6 +320,10 @@ public class Channel implements VirtualChannel, IChannel {
* safe the resulting behavior is is up to discussion.
*/
public Channel(String name, ExecutorService exec, Mode mode, InputStream is, OutputStream os, OutputStream header, boolean restricted) throws IOException {
this(name,exec,mode,is,os,header,restricted,new Capability());
}
/*package*/ Channel(String name, ExecutorService exec, Mode mode, InputStream is, OutputStream os, OutputStream header, boolean restricted, Capability capability) throws IOException {
this.name = name;
this.executor = exec;
this.isRestricted = restricted;
......@@ -321,7 +339,7 @@ public class Channel implements VirtualChannel, IChannel {
//
// so use magic preamble and discard all the data up to that to improve robustness.
new Capability().writePreamble(os);
capability.writePreamble(os);
ObjectOutputStream oos = null;
if(mode!= Mode.NEGOTIATE) {
......@@ -361,6 +379,7 @@ public class Channel implements VirtualChannel, IChannel {
}
this.oos = oos;
this.remoteCapability = cap;
this.pipeWriter = createPipeWriter();
this.ois = new ObjectInputStream(mode.wrap(is));
new ReaderThread(name).start();
......@@ -402,6 +421,20 @@ public class Channel implements VirtualChannel, IChannel {
return outClosed!=null;
}
/**
* Creates the {@link ExecutorService} for writing to pipes.
*
* <p>
* If the throttling is supported, use a separate thread to free up the main channel
* reader thread (thus prevent blockage.) Otherwise let the channel reader thread do it,
* which is the historical behaviour.
*/
private ExecutorService createPipeWriter() {
if (remoteCapability.supportsPipeThrottling())
return Executors.newSingleThreadExecutor();
return new SynchronousExecutorService();
}
/**
* Sends a command to the remote end and executes it there.
*
......@@ -546,6 +579,19 @@ public class Channel implements VirtualChannel, IChannel {
return call(new PreloadJarTask(jars,local));
}
/*package*/ PipeWindow getPipeWindow(int oid) {
if (!remoteCapability.supportsPipeThrottling())
return PipeWindow.FAKE;
synchronized (pipeWindows) {
PipeWindow v = pipeWindows.get(oid);
if (v==null)
pipeWindows.put(oid,v = new Real(PIPE_WINDOW_SIZE));
return v;
}
}
/**
* {@inheritDoc}
*/
......@@ -894,6 +940,7 @@ public class Channel implements VirtualChannel, IChannel {
}
}
ois.close();
pipeWriter.shutdown();
} catch (IOException e) {
logger.log(Level.SEVERE, "I/O error in channel "+name,e);
terminate(e);
......@@ -926,6 +973,8 @@ public class Channel implements VirtualChannel, IChannel {
private static final Logger logger = Logger.getLogger(Channel.class.getName());
public static final int PIPE_WINDOW_SIZE = Integer.getInteger(Channel.class+".pipeWindowSize",128*1024);
// static {
// ConsoleHandler h = new ConsoleHandler();
// h.setFormatter(new Formatter(){
......
......@@ -175,14 +175,18 @@ public final class Pipe implements Serializable {
this.oidPos = oidPos;
}
protected void execute(Channel channel) {
protected void execute(final Channel channel) {
channel.pipeWriter.submit(new Runnable() {
public void run() {
try {
ProxyOutputStream ros = (ProxyOutputStream) channel.getExportedObject(oidRos);
final ProxyOutputStream ros = (ProxyOutputStream) channel.getExportedObject(oidRos);
channel.unexport(oidRos);
ros.connect(channel, oidPos);
} catch (IOException e) {
logger.log(Level.SEVERE,"Failed to connect to pipe",e);
}
}
});
}
}
}
/*
* The MIT License
*
* Copyright (c) 2010, InfraDNA, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package hudson.remoting;
import java.io.OutputStream;
/**
* Keeps track of the number of bytes that the sender can send without overwhelming the receiver of the pipe.
*
* <p>
* {@link OutputStream} is a blocking operation in Java, so when we send byte[] to the remote to write to
* {@link OutputStream}, it needs to be done in a separate thread (or else we'll fail to attend to the channel
* in timely fashion.) This in turn means the byte[] being sent needs to go to a queue between a
* channel reader thread and I/O processing thread, and thus in turn means we need some kind of throttling
* mechanism, or else the queue can grow too much.
*
* <p>
* This implementation solves the problem by using TCP/IP like window size tracking. The sender allocates
* a fixed length window size. Every time the sender sends something we reduce this value. When the receiver
* writes data to {@link OutputStream}, it'll send back the "ack" command, which adds to this value, allowing
* the sender to send more data.
*
* @author Kohsuke Kawaguchi
*/
abstract class PipeWindow {
......@@ -18,7 +58,7 @@ abstract class PipeWindow {
/**
* Fake implementation used when the receiver side doesn't support throttling.
*/
PipeWindow FAKE = new PipeWindow() {
static final PipeWindow FAKE = new PipeWindow() {
void increase(int delta) {
}
......@@ -38,6 +78,10 @@ abstract class PipeWindow {
static class Real extends PipeWindow {
private int available;
Real(int initialSize) {
this.available = initialSize;
}
public synchronized void increase(int delta) {
available += delta;
notifyAll();
......
......@@ -27,6 +27,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* {@link OutputStream} that sends bits to an exported
......@@ -85,7 +87,7 @@ final class ProxyOutputStream extends OutputStream {
if(tmp!=null) {
byte[] b = tmp.toByteArray();
tmp = null;
write(b);
_write(b);
}
if(closed) // already marked closed?
doClose();
......@@ -98,18 +100,29 @@ final class ProxyOutputStream extends OutputStream {
public void write(byte b[], int off, int len) throws IOException {
if(closed)
throw new IOException("stream is already closed");
_write(b, off, len);
}
private void _write(byte[] b, int off, int len) throws IOException {
if(off==0 && len==b.length)
write(b);
_write(b);
else {
byte[] buf = new byte[len];
System.arraycopy(b,off,buf,0,len);
write(buf);
_write(buf);
}
}
public synchronized void write(byte b[]) throws IOException {
if(closed)
throw new IOException("stream is already closed");
_write(b);
}
/**
* {@link #write(byte[])} without the close check.
*/
private void _write(byte[] b) throws IOException {
if(channel==null) {
if(tmp==null)
tmp = new ByteArrayOutputStream();
......@@ -127,8 +140,8 @@ final class ProxyOutputStream extends OutputStream {
channel.send(new Chunk(oid,b));
} else {
// fill the sender window size now, and send the rest in a separate chunk
write(b,0,sendable);
write(b,sendable,b.length-sendable);
_write(b,0,sendable);
_write(b,sendable,b.length-sendable);
}
}
}
......@@ -178,14 +191,28 @@ final class ProxyOutputStream extends OutputStream {
this.buf = buf;
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.getExportedObject(oid);
protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
public void run() {
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to write to stream",e);
} finally {
if (channel.remoteCapability.supportsPipeThrottling()) {
try {
channel.send(new Ack(oid,buf.length));
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to ack the stream",e);
}
}
}
}
});
}
public String toString() {
return "Pipe.Chunk("+oid+","+buf.length+")";
......@@ -206,13 +233,17 @@ final class ProxyOutputStream extends OutputStream {
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.getExportedObject(oid);
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
public void run() {
try {
os.flush();
} catch (IOException e) {
// ignore errors
}
}
});
}
public String toString() {
return "Pipe.Flush("+oid+")";
......@@ -234,9 +265,13 @@ final class ProxyOutputStream extends OutputStream {
this.oid = oid;
}
protected void execute(Channel channel) {
protected void execute(final Channel channel) {
channel.pipeWriter.submit(new Runnable() {
public void run() {
channel.unexport(oid);
}
});
}
public String toString() {
return "Pipe.Unexport("+oid+")";
......@@ -256,8 +291,10 @@ final class ProxyOutputStream extends OutputStream {
}
protected void execute(Channel channel) {
OutputStream os = (OutputStream) channel.getExportedObject(oid);
protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(new Runnable() {
public void run() {
channel.unexport(oid);
try {
os.close();
......@@ -265,6 +302,8 @@ final class ProxyOutputStream extends OutputStream {
// ignore errors
}
}
});
}
public String toString() {
return "Pipe.EOF("+oid+")";
......@@ -272,4 +311,35 @@ final class ProxyOutputStream extends OutputStream {
private static final long serialVersionUID = 1L;
}
/**
* {@link Command} to notify the sender that it can send some more data.
*/
private static class Ack extends Command {
/**
* The oid of the {@link OutputStream} on the receiver side of the data.
*/
private final int oid;
/**
* The number of bytes that were freed up.
*/
private final int size;
private Ack(int oid, int size) {
this.oid = oid;
this.size = size;
}
protected void execute(Channel channel) {
channel.getPipeWindow(oid).increase(size);
}
public String toString() {
return "Pipe.Ack("+oid+','+size+")";
}
private static final long serialVersionUID = 1L;
}
private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());
}
package hudson.remoting;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* {@link ExecutorService} that executes synchronously.
*
* @author Kohsuke Kawaguchi
*/
class SynchronousExecutorService extends AbstractExecutorService {
private volatile boolean shutdown = false;
private int count = 0;
public void shutdown() {
shutdown = true;
}
public List<Runnable> shutdownNow() {
shutdown = true;
return Collections.emptyList();
}
public boolean isShutdown() {
return shutdown;
}
public synchronized boolean isTerminated() {
return shutdown && count==0;
}
public synchronized boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long end = System.currentTimeMillis() + unit.toMillis(timeout);
while (count!=0) {
long d = end - System.currentTimeMillis();
if (d<0) return false;
wait(d);
}
return true;
}
public void execute(Runnable command) {
if (shutdown)
throw new IllegalStateException("Already shut down");
touchCount(1);
try {
command.run();
} finally {
touchCount(-1);
}
}
private synchronized void touchCount(int diff) {
count += diff;
if (count==0)
notifyAll();
}
}
/*
* The MIT License
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, InfraDNA, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
......@@ -23,6 +23,7 @@
*/
package hudson.remoting;
import hudson.remoting.Channel.Mode;
import junit.framework.Assert;
import java.io.IOException;
......@@ -49,12 +50,6 @@ interface ChannelRunner {
void stop(Channel channel) throws Exception;
String getName();
Class<? extends ChannelRunner>[] LIST = new Class[] {
InProcess.class,
Fork.class
};
/**
* Runs a channel in the same JVM.
*/
......@@ -66,18 +61,18 @@ interface ChannelRunner {
private Exception failure;
public Channel start() throws Exception {
final PipedInputStream in1 = new PipedInputStream();
final PipedOutputStream out1 = new PipedOutputStream(in1);
final FastPipedInputStream in1 = new FastPipedInputStream();
final FastPipedOutputStream out1 = new FastPipedOutputStream(in1);
final PipedInputStream in2 = new PipedInputStream();
final PipedOutputStream out2 = new PipedOutputStream(in2);
final FastPipedInputStream in2 = new FastPipedInputStream();
final FastPipedOutputStream out2 = new FastPipedOutputStream(in2);
executor = Executors.newCachedThreadPool();
Thread t = new Thread("south bridge runner") {
public void run() {
try {
Channel s = new Channel("south", executor, in2, out1);
Channel s = new Channel("south", executor, Mode.BINARY, in2, out1, null, false, createCapability());
s.join();
System.out.println("south completed");
} catch (IOException e) {
......@@ -91,7 +86,7 @@ interface ChannelRunner {
};
t.start();
return new Channel("north", executor, in1, out2);
return new Channel("north", executor, Mode.BINARY, in1, out2, null, false, createCapability());
}
public void stop(Channel channel) throws Exception {
......@@ -108,6 +103,21 @@ interface ChannelRunner {
public String getName() {
return "local";
}
protected Capability createCapability() {
return new Capability();
}
}
static class InProcessCompatibilityMode extends InProcess {
public String getName() {
return "local-compatibility";
}
@Override
protected Capability createCapability() {
return Capability.NONE;
}
}
/**
......
......@@ -23,8 +23,10 @@
*/
package hudson.remoting;
import hudson.remoting.ChannelRunner.InProcessCompatibilityMode;
import junit.framework.Test;
import java.io.DataInputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.InputStream;
......@@ -89,6 +91,78 @@ public class PipeTest extends RmiTestBase {
assertEquals(5,r);
}
public interface ISaturationTest {
void ensureConnected() throws IOException;
int readFirst() throws IOException;
void readRest() throws IOException;
}
public void testSaturation() throws Exception {
if (channelRunner instanceof InProcessCompatibilityMode)
return; // can't do this test without the throttling support.
final Pipe p = Pipe.createLocalToRemote();
Thread writer = new Thread() {
@Override
public void run() {
OutputStream os = p.getOut();
try {
byte[] buf = new byte[Channel.PIPE_WINDOW_SIZE*2+1];
os.write(buf);
} catch (IOException e) {
e.printStackTrace();
}
}
};
// 1. wait until the receiver sees the first byte. at this point the pipe should be completely clogged
// 2. make sure the writer thread is still alive, blocking
// 3. read the rest
ISaturationTest target = channel.call(new CreateSaturationTestProxy(p));
// make sure the pipe is connected
target.ensureConnected();
writer.start();
// make sure that some data arrived to the receiver
// at this point the pipe should be fully clogged
assertEquals(0,target.readFirst());
// the writer should be still blocked
Thread.sleep(1000);
assertTrue(writer.isAlive());
target.readRest();
}
private static class CreateSaturationTestProxy implements Callable<ISaturationTest,IOException> {
private final Pipe pipe;
public CreateSaturationTestProxy(Pipe pipe) {
this.pipe = pipe;
}
public ISaturationTest call() throws IOException {
return Channel.current().export(ISaturationTest.class, new ISaturationTest() {
private InputStream in;
public void ensureConnected() throws IOException {
in = pipe.getIn();
in.available();
}
public int readFirst() throws IOException {
return in.read();
}
public void readRest() throws IOException {
new DataInputStream(in).readFully(new byte[Channel.PIPE_WINDOW_SIZE*2]);
}
});
}
}
private static class ReadingCallable implements Callable<Integer, IOException> {
private final Pipe pipe;
......
/*
* The MIT License
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, InfraDNA, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
......@@ -33,10 +33,15 @@ import junit.framework.TestSuite;
*
* @author Kohsuke Kawaguchi
*/
@WithRunner({
ChannelRunner.InProcess.class,
ChannelRunner.InProcessCompatibilityMode.class,
ChannelRunner.Fork.class
})
public abstract class RmiTestBase extends TestCase {
protected Channel channel;
private ChannelRunner channelRunner = new InProcess();
protected ChannelRunner channelRunner = new InProcess();
protected void setUp() throws Exception {
System.out.println("Starting "+getName());
......@@ -68,7 +73,10 @@ public abstract class RmiTestBase extends TestCase {
*/
protected static Test buildSuite(Class<? extends RmiTestBase> testClass) {
TestSuite suite = new TestSuite();
for( Class<? extends ChannelRunner> r : ChannelRunner.LIST ) {
WithRunner wr = testClass.getAnnotation(WithRunner.class);
for( Class<? extends ChannelRunner> r : wr.value() ) {
suite.addTest(new ChannelTestSuite(testClass,r));
}
return suite;
......
/*
* The MIT License
*
* Copyright (c) 2010, InfraDNA, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package hudson.remoting;
import java.lang.annotation.Documented;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
/**
* Specify the channel runners for a test case.
* @author Kohsuke Kawaguchi
*/
@Retention(RUNTIME)
@Target(TYPE)
@Documented
@Inherited
public @interface WithRunner {
Class<? extends ChannelRunner>[] value();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册