提交 9812db70 编写于 作者: J Jesse Glick

Improved handling of stream closure.

Making interrupt of `build -s` work in `-http` mode.
Does not solve `ReadPendingException`s.
上级 f6314b7b
......@@ -728,34 +728,39 @@ public class CLI implements AutoCloseable {
protected void onStderr(byte[] chunk) throws IOException {
System.err.write(chunk);
}
}
final ClientSideImpl connection = new ClientSideImpl(streams.getInputStream(), streams.getOutputStream());
for (String arg : args) {
connection.sendArg(arg);
}
connection.sendEncoding(Charset.defaultCharset().name());
connection.sendLocale(Locale.getDefault().toString());
connection.sendStart();
connection.begin();
final OutputStream stdin = connection.streamStdin();
new Thread("input reader") {
@Override
public void run() {
try {
int c;
while ((c = System.in.read()) != -1) { // TODO use InputStream.available
stdin.write(c);
protected synchronized void handleClose() {
notifyAll();
}
}
try (final ClientSideImpl connection = new ClientSideImpl(streams.getInputStream(), streams.getOutputStream())) {
for (String arg : args) {
connection.sendArg(arg);
}
connection.sendEncoding(Charset.defaultCharset().name());
connection.sendLocale(Locale.getDefault().toString());
connection.sendStart();
connection.begin();
final OutputStream stdin = connection.streamStdin();
new Thread("input reader") {
@Override
public void run() {
try {
int c;
while ((c = System.in.read()) != -1) { // TODO use InputStream.available
stdin.write(c);
}
connection.sendEndStdin();
} catch (IOException x) {
x.printStackTrace();
}
connection.sendEndStdin();
} catch (IOException x) {
x.printStackTrace();
}
}.start();
synchronized (connection) {
connection.wait();
}
}.start();
synchronized (connection) {
connection.wait();
return connection.exit;
}
return connection.exit;
}
private static String computeVersion() {
......
......@@ -25,12 +25,14 @@
package hudson.cli;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadPendingException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
......@@ -68,7 +70,7 @@ class PlainCLIProtocol {
STDERR
}
static abstract class EitherSide {
static abstract class EitherSide implements Closeable {
private final CountingInputStream cis;
private final FlightRecorderInputStream flightRecorder;
......@@ -83,51 +85,71 @@ class PlainCLIProtocol {
}
final void begin() {
new Thread("PlainCLIProtocol") { // TODO set distinctive Thread.name
@Override
public void run() {
try {
while (true) {
LOGGER.finest("reading frame");
int framelen;
try {
framelen = dis.readInt();
} catch (EOFException x) {
break; // TODO verify that we hit EOF immediately, not partway into framelen
}
if (framelen < 0) {
throw new IOException("corrupt stream: negative frame length");
}
byte b = dis.readByte();
if (b < 0) { // i.e., >127
throw new IOException("corrupt stream: negative operation code");
}
if (b >= Op.values().length) {
LOGGER.log(Level.WARNING, "unknown operation #{0}: {1}", new Object[] {b, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
continue;
}
Op op = Op.values()[b];
long start = cis.getByteCount();
LOGGER.log(Level.FINEST, "handling frame with {0} of length {1}", new Object[] {op, framelen});
boolean handled = handle(op, framelen);
if (handled) {
long actuallyRead = cis.getByteCount() - start;
if (actuallyRead != framelen) {
throw new IOException("corrupt stream: expected to read " + framelen + " bytes from " + op + " but read " + actuallyRead);
}
} else {
LOGGER.log(Level.WARNING, "unexpected {0}: {1}", new Object[] {op, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
new Reader().start();
}
private class Reader extends Thread {
Reader() {
super("PlainCLIProtocol"); // TODO set distinctive Thread.name
}
@Override
public void run() {
try {
while (true) {
LOGGER.finest("reading frame");
int framelen;
try {
framelen = dis.readInt();
} catch (EOFException x) {
handleClose();
break; // TODO verify that we hit EOF immediately, not partway into framelen
} catch (ReadPendingException x) {
LOGGER.log(Level.FINE, null, x);
// TODO what does this signify? Seems to be thrown randomly by org.eclipse.jetty.io.FillInterest.register. No obvious impact.
// Check https://github.com/eclipse/jetty.project/issues/1047 in 9.4.3.v20170317 but cf. https://github.com/joakime/jetty-async-bug/issues/1
handleClose();
break;
}
if (framelen < 0) {
throw new IOException("corrupt stream: negative frame length");
}
byte b = dis.readByte();
if (b < 0) { // i.e., >127
throw new IOException("corrupt stream: negative operation code");
}
if (b >= Op.values().length) {
LOGGER.log(Level.WARNING, "unknown operation #{0}: {1}", new Object[] {b, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
continue;
}
Op op = Op.values()[b];
long start = cis.getByteCount();
LOGGER.log(Level.FINEST, "handling frame with {0} of length {1}", new Object[] {op, framelen});
boolean handled = handle(op, framelen);
if (handled) {
long actuallyRead = cis.getByteCount() - start;
if (actuallyRead != framelen) {
throw new IOException("corrupt stream: expected to read " + framelen + " bytes from " + op + " but read " + actuallyRead);
}
} else {
LOGGER.log(Level.WARNING, "unexpected {0}: {1}", new Object[] {op, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
}
}
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, flightRecorder.analyzeCrash(x, "broken stream"));
}
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, flightRecorder.analyzeCrash(x, "broken stream"));
} catch (ReadPendingException x) { // as above
LOGGER.log(Level.FINE, null, x);
handleClose();
}
}.start();
}
}
protected abstract void handleClose();
protected abstract boolean handle(Op op, int framelen) throws IOException;
private void writeOp(Op op) throws IOException {
......@@ -165,6 +187,7 @@ class PlainCLIProtocol {
}
protected final byte[] readChunk(int framelen) throws IOException {
assert Thread.currentThread() instanceof EitherSide.Reader;
byte[] buf = new byte[framelen];
dis.readFully(buf);
return buf;
......@@ -187,6 +210,11 @@ class PlainCLIProtocol {
};
}
@Override
public synchronized void close() throws IOException {
dos.close();
}
}
static abstract class ServerSide extends EitherSide {
......@@ -197,6 +225,7 @@ class PlainCLIProtocol {
@Override
protected final boolean handle(Op op, int framelen) throws IOException {
assert Thread.currentThread() instanceof EitherSide.Reader;
switch (op) {
case ARG:
onArg(dis.readUTF());
......@@ -255,6 +284,7 @@ class PlainCLIProtocol {
@Override
protected boolean handle(Op op, int framelen) throws IOException {
assert Thread.currentThread() instanceof EitherSide.Reader;
switch (op) {
case EXIT:
onExit(dis.readInt());
......
......@@ -55,6 +55,7 @@ import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.util.FullDuplexHttpService;
......@@ -132,6 +133,7 @@ public class CLIAction implements UnprotectedRootAction, StaplerProxy {
return new FullDuplexHttpService(uuid) {
@Override
protected void run(InputStream upload, OutputStream download) throws IOException, InterruptedException {
final AtomicReference<Thread> runningThread = new AtomicReference<>();
class ServerSideImpl extends PlainCLIProtocol.ServerSide {
List<String> args = new ArrayList<>();
Locale locale = Locale.getDefault();
......@@ -176,30 +178,46 @@ public class CLIAction implements UnprotectedRootAction, StaplerProxy {
protected void onEndStdin() throws IOException {
stdinMatch.close();
}
@Override
protected synchronized void handleClose() {
notifyAll();
Thread t = runningThread.get();
if (t != null) {
t.interrupt();
}
}
}
ServerSideImpl connection = new ServerSideImpl(upload, download);
connection.begin();
synchronized (connection) {
connection.wait(); // TODO this can wait indefinitely even when the connection is broken
}
PrintStream stdout = new PrintStream(connection.streamStdout(), false, connection.encoding.name());
PrintStream stderr = new PrintStream(connection.streamStderr(), true, connection.encoding.name());
String commandName = connection.args.get(0);
CLICommand command = CLICommand.clone(commandName);
if (command == null) {
stderr.println("No such command " + commandName);
connection.sendExit(2);
return;
}
command.setTransportAuth(Jenkins.getAuthentication());
command.setClientCharset(connection.encoding);
CLICommand orig = CLICommand.setCurrent(command);
try {
int exit = command.main(connection.args.subList(1, connection.args.size()), connection.locale, connection.stdin, stdout, stderr);
stdout.flush();
connection.sendExit(exit);
} finally {
CLICommand.setCurrent(orig);
try (ServerSideImpl connection = new ServerSideImpl(upload, download)) {
connection.begin();
synchronized (connection) {
connection.wait();
}
PrintStream stdout = new PrintStream(connection.streamStdout(), false, connection.encoding.name());
PrintStream stderr = new PrintStream(connection.streamStderr(), true, connection.encoding.name());
if (connection.args.isEmpty()) {
stderr.println("Connection closed before arguments received");
connection.sendExit(2);
return;
}
String commandName = connection.args.get(0);
CLICommand command = CLICommand.clone(commandName);
if (command == null) {
stderr.println("No such command " + commandName);
connection.sendExit(2);
return;
}
command.setTransportAuth(Jenkins.getAuthentication());
command.setClientCharset(connection.encoding);
CLICommand orig = CLICommand.setCurrent(command);
try {
runningThread.set(Thread.currentThread());
int exit = command.main(connection.args.subList(1, connection.args.size()), connection.locale, connection.stdin, stdout, stderr);
stdout.flush();
connection.sendExit(exit);
} finally {
CLICommand.setCurrent(orig);
runningThread.set(null);
}
}
}
};
......
......@@ -120,9 +120,7 @@ public class CLITest {
p.getBuildersList().add(new SleepBuilder(TimeUnit.MINUTES.toMillis(5)));
doInterrupt(jar, p, "-remoting", "-i", privkey.getAbsolutePath());
doInterrupt(jar, p, "-ssh", "-user", "admin", "-i", privkey.getAbsolutePath());
/* TODO does not yet work in HTTP mode:
doInterrupt(jar, p, "-http", "-auth", "admin:admin");
*/
}
private void doInterrupt(File jar, FreeStyleProject p, String... modeArgs) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册