提交 4c10efe1 编写于 作者: B Brian Atkinson

Merge branch 'JENKINS-8592'

......@@ -599,9 +599,6 @@ public class Channel implements VirtualChannel, IChannel {
}
/*package*/ PipeWindow getPipeWindow(int oid) {
if (!remoteCapability.supportsPipeThrottling())
return PipeWindow.FAKE;
synchronized (pipeWindows) {
Key k = new Key(oid);
WeakReference<PipeWindow> v = pipeWindows.get(k);
......@@ -611,7 +608,11 @@ public class Channel implements VirtualChannel, IChannel {
return w;
}
Real w = new Real(k, PIPE_WINDOW_SIZE);
PipeWindow w;
if (remoteCapability.supportsPipeThrottling())
w = new Real(k, PIPE_WINDOW_SIZE);
else
w = new PipeWindow.Fake();
pipeWindows.put(k,new WeakReference<PipeWindow>(w));
return w;
}
......
......@@ -47,6 +47,10 @@ abstract class Command implements Serializable {
this(true);
}
protected Command(Throwable cause) {
this.createdAt = new Source(cause);
}
/**
* @param recordCreatedAt
* If false, skip the recording of where the command is created. This makes the trouble-shooting
......@@ -74,6 +78,10 @@ abstract class Command implements Serializable {
public Source() {
}
private Source(Throwable cause) {
super(cause);
}
public String toString() {
return "Command "+Command.this.toString()+" created at";
}
......
......@@ -93,19 +93,7 @@ public final class Pipe implements Serializable {
* Gets the reading end of the pipe.
*/
public InputStream getIn() {
return new FilterInputStream(in) {
@Override
public void close() throws IOException {
try {
// Since closing the reading side does not stop the writing side. We read till the stream is done.
final byte[] buffer = new byte[4096];
while(read(buffer) != -1) {
}
} finally {
super.close();
}
}
};
return in;
}
/**
......
......@@ -23,6 +23,7 @@
*/
package hudson.remoting;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Logger;
......@@ -47,21 +48,46 @@ import static java.util.logging.Level.*;
* @author Kohsuke Kawaguchi
*/
abstract class PipeWindow {
protected Throwable dead;
abstract void increase(int delta);
abstract int peek();
/**
* Blocks until some space becomes available.
*
* @throws IOException
* If we learned that there is an irrecoverable problem on the remote side that prevents us from writing.
* @throws InterruptedException
* If a thread was interrupted while blocking.
*/
abstract int get() throws InterruptedException;
abstract int get() throws InterruptedException, IOException;
abstract void decrease(int delta);
/**
* Indicates that the remote end has died and all the further send attempt should fail.
*/
void dead(Throwable cause) {
this.dead = cause;
}
/**
* If we already know that the remote end had developed a problem, throw an exception.
* Otherwise no-op.
*/
protected void checkDeath() throws IOException {
if (dead!=null)
// the remote end failed to write.
throw (IOException)new IOException("Pipe is already closed").initCause(dead);
}
/**
* Fake implementation used when the receiver side doesn't support throttling.
*/
static final PipeWindow FAKE = new PipeWindow() {
static class Fake extends PipeWindow {
void increase(int delta) {
}
......@@ -69,13 +95,14 @@ abstract class PipeWindow {
return Integer.MAX_VALUE;
}
int get() throws InterruptedException {
int get() throws InterruptedException, IOException {
checkDeath();
return Integer.MAX_VALUE;
}
void decrease(int delta) {
}
};
}
static final class Key {
public final int oid;
......@@ -134,13 +161,15 @@ abstract class PipeWindow {
* to avoid fragmenting the window size. That is, if a bunch of small ACKs come in a sequence,
* bundle them up into a bigger size before making a call.
*/
public int get() throws InterruptedException {
public int get() throws InterruptedException, IOException {
checkDeath();
synchronized (this) {
if (available>0)
return available;
while (available<=0) {
wait();
checkDeath();
}
}
......
......@@ -184,12 +184,23 @@ final class ProxyOutputStream extends OutputStream {
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to write to stream",e);
try {
channel.send(new NotifyDeadWriter(e,oid));
} catch (ChannelClosedException x) {
// the other direction can be already closed if the connection
// shut down is initiated from this side. In that case, remain silent.
} catch (IOException x) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to notify the sender that the write end is dead",x);
LOGGER.log(Level.WARNING, "... the failed write was:",e);
}
} finally {
if (channel.remoteCapability.supportsPipeThrottling()) {
try {
channel.send(new Ack(oid,buf.length));
} catch (ChannelClosedException x) {
// the other direction can be already closed if the connection
// shut down is initiated from this side. In that case, remain silent.
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to ack the stream",e);
......@@ -329,5 +340,29 @@ final class ProxyOutputStream extends OutputStream {
private static final long serialVersionUID = 1L;
}
/**
* {@link Command} to notify the sender that the receiver is dead.
*/
private static final class NotifyDeadWriter extends Command {
private final int oid;
private NotifyDeadWriter(Throwable cause, int oid) {
super(cause);
this.oid = oid;
}
@Override
protected void execute(Channel channel) {
PipeWindow w = channel.getPipeWindow(oid);
w.dead(createdAt.getCause());
}
public String toString() {
return "Pipe.Dead("+oid+")";
}
private static final long serialVersionUID = 1L;
}
private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());
}
......@@ -24,13 +24,11 @@
package hudson.remoting;
import hudson.remoting.ChannelRunner.InProcessCompatibilityMode;
import hudson.remoting.FastPipedInputStream.ClosedBy;
import junit.framework.Test;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.jvnet.hudson.test.Bug;
import org.jvnet.hudson.test.For;
import org.jvnet.hudson.test.Url;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
......@@ -39,9 +37,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.concurrent.ExecutionException;
/**
* Test {@link Pipe}.
......@@ -64,57 +60,46 @@ public class PipeTest extends RmiTestBase implements Serializable {
}
/**
* Helper class for testPartialReadQuietlyConsumesPipeOnClose.
*/
private static class EventCounterHandler extends Handler {
int count = 0;
@Override
public void publish(final LogRecord record) {
if (ProxyOutputStream.class.getName().equals(record.getLoggerName())) {
Throwable thrown = record.getThrown();
while (thrown != null) {
if (thrown instanceof ClosedBy) {
count += 1;
break;
}
thrown = thrown.getCause();
}
}
}
@Override
public void flush() {
}
@Override
public void close() throws SecurityException {
}
public int getCount() {
return count;
}
}
/**
* This test should be reproducing the initial bug as reported in JENKINS-8592. The assert in this test is not the
* best and is fragile, but it is the best I can come up with.
* Have the reader close the read end of the pipe while the writer is still writing.
* The writer should pick up a failure.
*/
@Bug(8592)
@For(Pipe.class)
@Url("http://issues.jenkins-ci.org/browse/JENKINS-8592")
public void testPartialReadQuietlyConsumesPipeOnClose() throws Exception {
final EventCounterHandler handler = new EventCounterHandler();
final Logger logger = Logger.getLogger(ProxyOutputStream.class.getName());
logger.addHandler(handler);
public void testReaderCloseWhileWriterIsStillWriting() throws Exception {
final Pipe p = Pipe.createRemoteToLocal();
final Future<Integer> f = channel.callAsync(new WritingCallable(p));
final Future<Void> f = channel.callAsync(new InfiniteWriter(p));
final InputStream in = p.getIn();
assertEquals(in.read(), 0);
in.close();
f.get();
logger.removeHandler(handler);
assertEquals(0, handler.getCount());
try {
f.get();
fail();
} catch (ExecutionException e) {
// should have resulted in an IOException
if (!(e.getCause() instanceof IOException)) {
e.printStackTrace();
fail();
}
}
}
/**
* Just writes forever to the pipe
*/
private static class InfiniteWriter implements Callable<Void, Exception> {
private final Pipe pipe;
public InfiniteWriter(Pipe pipe) {
this.pipe = pipe;
}
public Void call() throws Exception {
while (true) {
pipe.getOut().write(0);
Thread.sleep(10);
}
}
}
private static class WritingCallable implements Callable<Integer, IOException> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册