提交 8302b850 编写于 作者: J Jesse Glick

Draft of a new CLI transport that can operate over CLIAction with the regular HTTP(S) port.

上级 2fe2487b
......@@ -34,6 +34,10 @@
<artifactId>commons-codec</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>remoting</artifactId>
......
......@@ -62,6 +62,7 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.PublicKey;
......@@ -76,6 +77,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import static java.util.logging.Level.*;
......@@ -105,6 +107,7 @@ public class CLI implements AutoCloseable {
private final String httpsProxyTunnel;
private final String authorization;
/** Connection via {@link Mode#REMOTING}, for tests only. */
public CLI(URL jenkins) throws IOException, InterruptedException {
this(jenkins,null);
}
......@@ -126,7 +129,8 @@ public class CLI implements AutoCloseable {
public CLI(URL jenkins, ExecutorService exec, String httpsProxyTunnel) throws IOException, InterruptedException {
this(new CLIConnectionFactory().url(jenkins).executorService(exec).httpsProxyTunnel(httpsProxyTunnel));
}
/** Connection via {@link Mode#REMOTING}. */
/*package*/ CLI(CLIConnectionFactory factory) throws IOException, InterruptedException {
URL jenkins = factory.jenkins;
this.httpsProxyTunnel = factory.httpsProxyTunnel;
......@@ -162,9 +166,8 @@ public class CLI implements AutoCloseable {
}
private Channel connectViaHttp(String url) throws IOException {
LOGGER.log(FINE, "Trying to connect to {0} via HTTP", url);
url+="cli";
URL jenkins = new URL(url);
LOGGER.log(FINE, "Trying to connect to {0} via Remoting over HTTP", url);
URL jenkins = new URL(url + "cli?remoting=true");
FullDuplexHttpStream con = new FullDuplexHttpStream(jenkins,authorization);
Channel ch = new Channel("Chunked connection to "+jenkins,
......@@ -181,7 +184,7 @@ public class CLI implements AutoCloseable {
}
private Channel connectViaCliPort(URL jenkins, CliPort clip) throws IOException {
LOGGER.log(FINE, "Trying to connect directly via TCP/IP to {0}", clip.endpoint);
LOGGER.log(FINE, "Trying to connect directly via Remoting over TCP/IP to {0}", clip.endpoint);
final Socket s = new Socket();
// this prevents a connection from silently terminated by the router in between or the other peer
// and that goes without unnoticed. However, the time out is often very long (for example 2 hours
......@@ -391,12 +394,6 @@ public class CLI implements AutoCloseable {
}
public static void main(final String[] _args) throws Exception {
// Logger l = Logger.getLogger(Channel.class.getName());
// l.setLevel(ALL);
// ConsoleHandler h = new ConsoleHandler();
// h.setLevel(ALL);
// l.addHandler(h);
//
try {
System.exit(_main(_args));
} catch (Throwable t) {
......@@ -406,6 +403,7 @@ public class CLI implements AutoCloseable {
}
}
private enum Mode {HTTP, SSH, REMOTING}
public static int _main(String[] _args) throws Exception {
List<String> args = Arrays.asList(_args);
PrivateKeyProvider provider = new PrivateKeyProvider();
......@@ -419,7 +417,7 @@ public class CLI implements AutoCloseable {
boolean tryLoadPKey = true;
boolean useRemoting = false;
Mode mode = null;
String user = null;
......@@ -429,10 +427,29 @@ public class CLI implements AutoCloseable {
System.out.println("Version: "+computeVersion());
return 0;
}
if (head.equals("-http")) {
if (mode != null) {
printUsage("-http clashes with previously defined mode " + mode);
return -1;
}
mode = Mode.HTTP;
args = args.subList(1, args.size());
}
if (head.equals("-ssh")) {
if (mode != null) {
printUsage("-ssh clashes with previously defined mode " + mode);
return -1;
}
mode = Mode.SSH;
args = args.subList(1, args.size());
}
if (head.equals("-remoting")) {
useRemoting = true;
args = args.subList(1,args.size());
continue;
if (mode != null) {
printUsage("-remoting clashes with previously defined mode " + mode);
return -1;
}
mode = Mode.REMOTING;
args = args.subList(1, args.size());
}
if(head.equals("-s") && args.size()>=2) {
url = args.get(1);
......@@ -481,6 +498,17 @@ public class CLI implements AutoCloseable {
args = args.subList(2,args.size());
continue;
}
if (head.equals("-logger") && args.size() >= 2) {
Level level = parse(args.get(1));
ConsoleHandler h = new ConsoleHandler();
h.setLevel(level);
for (Logger logger : new Logger[] {LOGGER, PlainCLIProtocol.LOGGER}) { // perhaps also Channel
logger.setLevel(level);
logger.addHandler(h);
}
args = args.subList(2, args.size());
continue;
}
break;
}
......@@ -495,17 +523,23 @@ public class CLI implements AutoCloseable {
if (tryLoadPKey && !provider.hasKeys())
provider.readFromDefaultLocations();
if (!useRemoting) {
if (mode == null) {
mode = Mode.HTTP;
}
LOGGER.log(FINE, "using connection mode {0}", mode);
if (mode == Mode.SSH) {
if (user == null) {
// TODO SshCliAuthenticator already autodetects the user based on public key; why cannot AsynchronousCommand.getCurrentUser do the same?
System.err.println("-user required when not using -remoting");
System.err.println("-user required when using -ssh");
return -1;
}
return sshConnection(url, user, args, provider);
}
if (user != null) {
System.err.println("Warning: -user ignored when using -remoting");
System.err.println("Warning: -user ignored unless using -ssh");
}
CLIConnectionFactory factory = new CLIConnectionFactory().url(url).httpsProxyTunnel(httpProxy);
......@@ -514,6 +548,10 @@ public class CLI implements AutoCloseable {
factory = factory.basicAuth(userInfo);
}
if (mode == Mode.HTTP) {
return plainHttpConnection(url, args, factory);
}
CLI cli = factory.connect();
try {
if (provider.hasKeys()) {
......@@ -560,7 +598,7 @@ public class CLI implements AutoCloseable {
return -1;
}
System.err.println("Connecting via SSH to: " + endpointDescription);
LOGGER.log(FINE, "Connecting via SSH to: {0}", endpointDescription);
int sshPort = Integer.parseInt(endpointDescription.split(":")[1]);
String sshHost = endpointDescription.split(":")[0];
......@@ -619,6 +657,61 @@ public class CLI implements AutoCloseable {
}
}
private static int plainHttpConnection(String url, List<String> args, CLIConnectionFactory factory) throws IOException, InterruptedException {
LOGGER.log(FINE, "Trying to connect to {0} via plain protocol over HTTP", url);
URL jenkins = new URL(url + "cli?remoting=false");
FullDuplexHttpStream streams = new FullDuplexHttpStream(jenkins, factory.authorization);
class ClientSideImpl extends PlainCLIProtocol.ClientSide {
int exit = -1;
ClientSideImpl(InputStream is, OutputStream os) throws IOException {
super(is, os);
if (is.read() != 0) { // cf. FullDuplexHttpService
throw new IOException("expected to see initial zero byte");
}
}
@Override
protected synchronized void onExit(int code) {
this.exit = code;
notify();
}
@Override
protected void onStdout(byte[] chunk) throws IOException {
System.out.write(chunk);
}
@Override
protected void onStderr(byte[] chunk) throws IOException {
System.err.write(chunk);
}
}
final ClientSideImpl connection = new ClientSideImpl(streams.getInputStream(), streams.getOutputStream());
for (String arg : args) {
connection.sendArg(arg);
}
connection.sendEncoding(Charset.defaultCharset().name());
connection.sendLocale(Locale.getDefault().toString());
connection.sendStart();
connection.begin();
final OutputStream stdin = connection.streamStdin();
new Thread("input reader") {
@Override
public void run() {
try {
int c;
while ((c = System.in.read()) != -1) {
stdin.write(c);
}
connection.sendEndStdin();
} catch (IOException x) {
x.printStackTrace();
}
}
}.start();
synchronized (connection) {
connection.wait();
}
return connection.exit;
}
private static String computeVersion() {
Properties props = new Properties();
try {
......
......@@ -32,6 +32,7 @@ public class CLIConnectionFactory {
/**
* This {@link ExecutorService} is used to execute closures received from the server.
* Used only in Remoting mode.
*/
public CLIConnectionFactory executorService(ExecutorService es) {
this.exec = es;
......@@ -67,7 +68,10 @@ public class CLIConnectionFactory {
public CLIConnectionFactory basicAuth(String userInfo) {
return authorization("Basic " + new String(Base64.encodeBase64((userInfo).getBytes())));
}
/**
* Used only in Remoting mode.
*/
public CLI connect() throws IOException, InterruptedException {
return new CLI(this);
}
......
package hudson.cli;
import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.StringWriter;
// TODO COPIED FROM hudson.remoting
/**
* Signals a {@link StreamCorruptedException} with some additional diagnostic information.
*
* @author Kohsuke Kawaguchi
*/
class DiagnosedStreamCorruptionException extends StreamCorruptedException {
private final Exception diagnoseFailure;
private final byte[] readBack;
private final byte[] readAhead;
DiagnosedStreamCorruptionException(Exception cause, Exception diagnoseFailure, byte[] readBack, byte[] readAhead) {
initCause(cause);
this.diagnoseFailure = diagnoseFailure;
this.readBack = readBack;
this.readAhead = readAhead;
}
public Exception getDiagnoseFailure() {
return diagnoseFailure;
}
public byte[] getReadBack() {
return readBack;
}
public byte[] getReadAhead() {
return readAhead;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(super.toString()).append("\n");
buf.append("Read back: ").append(HexDump.toHex(readBack)).append('\n');
buf.append("Read ahead: ").append(HexDump.toHex(readAhead));
if (diagnoseFailure!=null) {
StringWriter w = new StringWriter();
PrintWriter p = new PrintWriter(w);
diagnoseFailure.printStackTrace(p);
p.flush();
buf.append("\nDiagnosis problem:\n ");
buf.append(w.toString().trim().replace("\n","\n "));
}
return buf.toString();
}
}
package hudson.cli;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
// TODO COPIED FROM hudson.remoting
/**
* Filter input stream that records the content as it's read, so that it can be reported
* in case of a catastrophic stream corruption problem.
*
* @author Kohsuke Kawaguchi
*/
class FlightRecorderInputStream extends InputStream {
/**
* Size (in bytes) of the flight recorder ring buffer used for debugging remoting issues.
* @since 2.41
*/
static final int BUFFER_SIZE = Integer.getInteger("hudson.remoting.FlightRecorderInputStream.BUFFER_SIZE", 1024 * 1024);
private final InputStream source;
private ByteArrayRingBuffer recorder = new ByteArrayRingBuffer(BUFFER_SIZE);
FlightRecorderInputStream(InputStream source) {
this.source = source;
}
/**
* Rewinds the record buffer and forget everything that was recorded.
*/
public void clear() {
recorder = new ByteArrayRingBuffer(BUFFER_SIZE);
}
/**
* Gets the recorded content.
*/
public byte[] getRecord() {
return recorder.toByteArray();
}
/**
* Creates a {@link DiagnosedStreamCorruptionException} based on the recorded content plus read ahead.
* The caller is responsible for throwing the exception.
*/
public DiagnosedStreamCorruptionException analyzeCrash(Exception problem, String diagnosisName) {
final ByteArrayOutputStream readAhead = new ByteArrayOutputStream();
final IOException[] error = new IOException[1];
Thread diagnosisThread = new Thread(diagnosisName+" stream corruption diagnosis thread") {
public void run() {
int b;
try {
// not all InputStream will look for the thread interrupt flag, so check that explicitly to be defensive
while (!Thread.interrupted() && (b=source.read())!=-1) {
readAhead.write(b);
}
} catch (IOException e) {
error[0] = e;
}
}
};
// wait up to 1 sec to grab as much data as possible
diagnosisThread.start();
try {
diagnosisThread.join(1000);
} catch (InterruptedException ignored) {
// we are only waiting for a fixed amount of time, so we'll pretend like we were in a busy loop
Thread.currentThread().interrupt();
// fall through
}
IOException diagnosisProblem = error[0]; // capture the error, if any, before we kill the thread
if (diagnosisThread.isAlive())
diagnosisThread.interrupt(); // if it's not dead, kill
return new DiagnosedStreamCorruptionException(problem,diagnosisProblem,getRecord(),readAhead.toByteArray());
}
@Override
public int read() throws IOException {
int i = source.read();
if (i>=0)
recorder.write(i);
return i;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
len = source.read(b, off, len);
if (len>0)
recorder.write(b,off,len);
return len;
}
/**
* To record the bytes we've skipped, convert the call to read.
*/
@Override
public long skip(long n) throws IOException {
byte[] buf = new byte[(int)Math.min(n,64*1024)];
return read(buf,0,buf.length);
}
@Override
public int available() throws IOException {
return source.available();
}
@Override
public void close() throws IOException {
source.close();
}
@Override
public boolean markSupported() {
return false;
}
// http://stackoverflow.com/a/3651696/12916
private static class ByteArrayRingBuffer extends OutputStream {
byte[] data;
int capacity, pos = 0;
boolean filled = false;
public ByteArrayRingBuffer(int capacity) {
data = new byte[capacity];
this.capacity = capacity;
}
@Override
public synchronized void write(int b) {
if (pos == capacity) {
filled = true;
pos = 0;
}
data[pos++] = (byte) b;
}
public synchronized byte[] toByteArray() {
if (!filled) {
return Arrays.copyOf(data, pos);
}
byte[] ret = new byte[capacity];
System.arraycopy(data, pos, ret, 0, capacity - pos);
System.arraycopy(data, 0, ret, capacity - pos, pos);
return ret;
}
/** @author @roadrunner2 */
@Override public synchronized void write(byte[] buf, int off, int len) {
// no point in trying to copy more than capacity; this also simplifies logic below
if (len > capacity) {
off += (len - capacity);
len = capacity;
}
// copy to buffer, but no farther than the end
int num = Math.min(len, capacity - pos);
if (num > 0) {
System.arraycopy(buf, off, data, pos, num);
off += num;
len -= num;
pos += num;
}
// wrap around if necessary
if (pos == capacity) {
filled = true;
pos = 0;
}
// copy anything still left
if (len > 0) {
System.arraycopy(buf, off, data, pos, len);
pos += len;
}
}
}
}
package hudson.cli;
// TODO COPIED FROM hudson.remoting
/**
* @author Kohsuke Kawaguchi
*/
class HexDump {
private static final String CODE = "0123456789abcdef";
public static String toHex(byte[] buf) {
return toHex(buf,0,buf.length);
}
public static String toHex(byte[] buf, int start, int len) {
StringBuilder r = new StringBuilder(len*2);
boolean inText = false;
for (int i=0; i<len; i++) {
byte b = buf[start+i];
if (b >= 0x20 && b <= 0x7e) {
if (!inText) {
inText = true;
r.append('\'');
}
r.append((char) b);
} else {
if (inText) {
r.append("' ");
inText = false;
}
r.append("0x");
r.append(CODE.charAt((b>>4)&15));
r.append(CODE.charAt(b&15));
if (i < len - 1) {
if (b == 10) {
r.append('\n');
} else {
r.append(' ');
}
}
}
}
if (inText) {
r.append('\'');
}
return r.toString();
}
}
/*
* The MIT License
*
* Copyright 2017 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package hudson.cli;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.CountingInputStream;
/**
* CLI protocol working over a plain socket-like connection, without SSH or Remoting.
* Each side consists of frames starting with an {@code int} length,
* then a {@code byte} opcode, then any opcode-specific data.
* The length does not count the length field itself nor the opcode, so it is nonnegative.
*/
class PlainCLIProtocol {
static final Logger LOGGER = Logger.getLogger(PlainCLIProtocol.class.getName());
/** One-byte operation to send to the other side. */
private enum Op {
/** UTF-8 command name or argument. */
ARG,
/** UTF-8 locale identifier. */
LOCALE,
/** UTF-8 client encoding. */
ENCODING,
/** Start running command. */
START,
/** Exit code, as int. */
EXIT,
/** Chunk of stdin, as int length followed by bytes. */
STDIN,
/** EOF on stdin. */
END_STDIN,
/** Chunk of stdout. */
STDOUT,
/** Chunk of stderr. */
STDERR
}
static abstract class EitherSide {
private final CountingInputStream cis;
private final FlightRecorderInputStream flightRecorder;
final DataInputStream dis;
final DataOutputStream dos;
protected EitherSide(InputStream is, OutputStream os) {
cis = new CountingInputStream(is);
flightRecorder = new FlightRecorderInputStream(cis);
dis = new DataInputStream(flightRecorder);
dos = new DataOutputStream(os);
}
final void begin() {
new Thread("PlainCLIProtocol") { // TODO set distinctive Thread.name
@Override
public void run() {
try {
while (true) {
LOGGER.finest("reading frame");
int framelen;
try {
framelen = dis.readInt();
} catch (EOFException x) {
break; // TODO verify that we hit EOF immediately, not partway into framelen
}
if (framelen < 0) {
throw new IOException("corrupt stream: negative frame length");
}
byte b = dis.readByte();
if (b < 0) { // i.e., >127
throw new IOException("corrupt stream: negative operation code");
}
if (b >= Op.values().length) {
LOGGER.log(Level.WARNING, "unknown operation #{0}: {1}", new Object[] {b, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
continue;
}
Op op = Op.values()[b];
long start = cis.getByteCount();
LOGGER.log(Level.FINEST, "handling frame with {0} of length {1}", new Object[] {op, framelen});
boolean handled = handle(op, framelen);
if (handled) {
long actuallyRead = cis.getByteCount() - start;
if (actuallyRead != framelen) {
throw new IOException("corrupt stream: expected to read " + framelen + " bytes from " + op + " but read " + actuallyRead);
}
} else {
LOGGER.log(Level.WARNING, "unexpected {0}: {1}", new Object[] {op, HexDump.toHex(flightRecorder.getRecord())});
IOUtils.skipFully(dis, framelen);
}
}
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, flightRecorder.analyzeCrash(x, "broken stream"));
}
}
}.start();
}
protected abstract boolean handle(Op op, int framelen) throws IOException;
private void writeOp(Op op) throws IOException {
dos.writeByte((byte) op.ordinal());
}
protected final synchronized void send(Op op) throws IOException {
dos.writeInt(0);
writeOp(op);
dos.flush();
}
protected final synchronized void send(Op op, int number) throws IOException {
dos.writeInt(4);
writeOp(op);
dos.writeInt(number);
dos.flush();
}
protected final synchronized void send(Op op, byte[] chunk, int off, int len) throws IOException {
dos.writeInt(len);
writeOp(op);
dos.write(chunk, off, len);
dos.flush();
}
protected final void send(Op op, byte[] chunk) throws IOException {
send(op, chunk, 0, chunk.length);
}
protected final void send(Op op, String text) throws IOException {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
new DataOutputStream(buf).writeUTF(text);
send(op, buf.toByteArray());
}
protected final byte[] readChunk(int framelen) throws IOException {
byte[] buf = new byte[framelen];
dis.readFully(buf);
return buf;
}
protected final OutputStream stream(final Op op) {
return new OutputStream() {
@Override
public void write(int b) throws IOException {
send(op, new byte[] {(byte) b});
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
send(op, b, off, len);
}
@Override
public void write(byte[] b) throws IOException {
send(op, b);
}
};
}
}
static abstract class ServerSide extends EitherSide {
ServerSide(InputStream is, OutputStream os) {
super(is, os);
}
@Override
protected final boolean handle(Op op, int framelen) throws IOException {
switch (op) {
case ARG:
onArg(dis.readUTF());
return true;
case LOCALE:
onLocale(dis.readUTF());
return true;
case ENCODING:
onEncoding(dis.readUTF());
return true;
case START:
onStart();
return true;
case STDIN:
onStdin(readChunk(framelen));
return true;
case END_STDIN:
onEndStdin();
return true;
default:
return false;
}
}
protected abstract void onArg(String text);
protected abstract void onLocale(String text);
protected abstract void onEncoding(String text);
protected abstract void onStart();
protected abstract void onStdin(byte[] chunk) throws IOException;
protected abstract void onEndStdin() throws IOException;
public final void sendExit(int code) throws IOException {
send(Op.EXIT, code);
}
public final OutputStream streamStdout() {
return stream(Op.STDOUT);
}
public final OutputStream streamStderr() {
return stream(Op.STDERR);
}
}
static abstract class ClientSide extends EitherSide {
ClientSide(InputStream is, OutputStream os) {
super(is, os);
}
@Override
protected boolean handle(Op op, int framelen) throws IOException {
switch (op) {
case EXIT:
onExit(dis.readInt());
return true;
case STDOUT:
onStdout(readChunk(framelen));
return true;
case STDERR:
onStderr(readChunk(framelen));
return true;
default:
return false;
}
}
protected abstract void onExit(int code);
protected abstract void onStdout(byte[] chunk) throws IOException;
protected abstract void onStderr(byte[] chunk) throws IOException;
public final void sendArg(String text) throws IOException {
send(Op.ARG, text);
}
public final void sendLocale(String text) throws IOException {
send(Op.LOCALE, text);
}
public final void sendEncoding(String text) throws IOException {
send(Op.ENCODING, text);
}
public final void sendStart() throws IOException {
send(Op.START);
}
public final OutputStream streamStdin() {
return stream(Op.STDIN);
}
public final void sendEndStdin() throws IOException {
send(Op.END_STDIN);
}
}
private PlainCLIProtocol() {}
}
......@@ -2,12 +2,15 @@ CLI.Usage=Jenkins CLI\n\
Usage: java -jar jenkins-cli.jar [-s URL] command [opts...] args...\n\
Options:\n\
-s URL : the server URL (defaults to the JENKINS_URL env var)\n\
-i KEY : SSH private key file used for authentication\n\
-http : use a plain CLI protocol over HTTP(S) (the default; mutually exclusive with -ssh and -remoting)\n\
-ssh : use SSH protocol (requires -user; SSH port must be open on server, and user must have registered a public key)\n\
-remoting : use deprecated Remoting channel protocol (if enabled on server; for compatibility with legacy commands or command modes only)\n\
-i KEY : SSH private key file used for authentication (for use with -ssh or -remoting)\n\
-p HOST:PORT : HTTP proxy host and port for HTTPS proxy tunneling. See https://jenkins.io/redirect/cli-https-proxy-tunnel\n\
-noCertificateCheck : bypass HTTPS certificate check entirely. Use with caution\n\
-noKeyAuth : don't try to load the SSH authentication private key. Conflicts with -i\n\
-remoting : use deprecated Remoting channel protocol (if enabled on server; for compatibility with legacy commands or command modes only)\n\
-user : specify user (for SSH mode, not -remoting)\n\
-user : specify user (for use with -ssh)\n\
-logger FINE : enable detailed logging from the client\n\
\n\
The available commands depend on the server. Run the 'help' command to\n\
see the list.
......
......@@ -37,7 +37,6 @@ import jenkins.model.Jenkins;
import org.jenkinsci.Symbol;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import org.kohsuke.stapler.HttpResponses.HttpResponseException;
import org.kohsuke.stapler.Stapler;
import org.kohsuke.stapler.StaplerProxy;
import org.kohsuke.stapler.StaplerRequest;
......@@ -46,6 +45,19 @@ import org.kohsuke.stapler.StaplerResponse;
import hudson.Extension;
import hudson.model.FullDuplexHttpChannel;
import hudson.remoting.Channel;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.util.FullDuplexHttpService;
/**
* Shows usage of CLI and commands.
......@@ -56,7 +68,9 @@ import hudson.remoting.Channel;
@Restricted(NoExternalUse.class)
public class CLIAction implements UnprotectedRootAction, StaplerProxy {
private transient final Map<UUID,FullDuplexHttpChannel> duplexChannels = new HashMap<UUID, FullDuplexHttpChannel>();
private static final Logger LOGGER = Logger.getLogger(CLIAction.class.getName());
private transient final Map<UUID, FullDuplexHttpService> duplexServices = new HashMap<>();
public String getIconFileName() {
return null;
......@@ -100,36 +114,95 @@ public class CLIAction implements UnprotectedRootAction, StaplerProxy {
/**
* Serves CLI-over-HTTP response.
*/
private class CliEndpointResponse extends HttpResponseException {
private class CliEndpointResponse extends FullDuplexHttpService.Response {
CliEndpointResponse() {
super(duplexServices);
}
@Override
public void generateResponse(StaplerRequest req, StaplerResponse rsp, Object node) throws IOException, ServletException {
try {
// do not require any permission to establish a CLI connection
// the actual authentication for the connecting Channel is done by CLICommand
UUID uuid = UUID.fromString(req.getHeader("Session"));
rsp.setHeader("Hudson-Duplex",""); // set the header so that the client would know
FullDuplexHttpChannel server;
if(req.getHeader("Side").equals("download")) {
duplexChannels.put(uuid,server=new FullDuplexHttpChannel(uuid, !Jenkins.getActiveInstance().hasPermission(Jenkins.ADMINISTER)) {
@Override
protected void main(Channel channel) throws IOException, InterruptedException {
// capture the identity given by the transport, since this can be useful for SecurityRealm.createCliAuthenticator()
channel.setProperty(CLICommand.TRANSPORT_AUTHENTICATION, Jenkins.getAuthentication());
channel.setProperty(CliEntryPoint.class.getName(),new CliManagerImpl(channel));
protected FullDuplexHttpService createService(StaplerRequest req, UUID uuid) throws IOException {
// do not require any permission to establish a CLI connection
// the actual authentication for the connecting Channel is done by CLICommand
if ("false".equals(req.getParameter("remoting"))) {
return new FullDuplexHttpService(uuid) {
@Override
protected void run(InputStream upload, OutputStream download) throws IOException, InterruptedException {
class ServerSideImpl extends PlainCLIProtocol.ServerSide {
List<String> args = new ArrayList<>();
Locale locale = Locale.getDefault();
Charset encoding = Charset.defaultCharset();
final PipedInputStream stdin = new PipedInputStream();
final PipedOutputStream stdinMatch = new PipedOutputStream();
ServerSideImpl(InputStream is, OutputStream os) throws IOException {
super(is, os);
stdinMatch.connect(stdin);
}
@Override
protected void onArg(String text) {
args.add(text);
}
@Override
protected void onLocale(String text) {
// TODO what is the opposite of Locale.toString()?
}
@Override
protected void onEncoding(String text) {
try {
encoding = Charset.forName(text);
} catch (UnsupportedCharsetException x) {
LOGGER.log(Level.WARNING, "unknown client charset {0}", text);
}
}
@Override
protected synchronized void onStart() {
notify();
}
@Override
protected void onStdin(byte[] chunk) throws IOException {
stdinMatch.write(chunk);
}
@Override
protected void onEndStdin() throws IOException {
stdinMatch.close();
}
}
});
try {
server.download(req,rsp);
} finally {
duplexChannels.remove(uuid);
ServerSideImpl connection = new ServerSideImpl(upload, download);
connection.begin();
synchronized (connection) {
connection.wait(); // TODO this can wait indefinitely even when the connection is broken
}
PrintStream stdout = new PrintStream(connection.streamStdout(), false, connection.encoding.name());
PrintStream stderr = new PrintStream(connection.streamStderr(), true, connection.encoding.name());
String commandName = connection.args.get(0);
CLICommand command = CLICommand.clone(commandName);
if (command == null) {
stderr.println("No such command " + commandName);
connection.sendExit(2);
return;
}
command.setTransportAuth(Jenkins.getAuthentication());
command.setClientCharset(connection.encoding);
CLICommand orig = CLICommand.setCurrent(command);
try {
int exit = command.main(connection.args.subList(1, connection.args.size()), connection.locale, connection.stdin, stdout, stderr);
stdout.flush();
connection.sendExit(exit);
} finally {
CLICommand.setCurrent(orig);
}
}
};
} else {
return new FullDuplexHttpChannel(uuid, !Jenkins.getInstance().hasPermission(Jenkins.ADMINISTER)) {
@Override
protected void main(Channel channel) throws IOException, InterruptedException {
// capture the identity given by the transport, since this can be useful for SecurityRealm.createCliAuthenticator()
channel.setProperty(CLICommand.TRANSPORT_AUTHENTICATION, Jenkins.getAuthentication());
channel.setProperty(CliEntryPoint.class.getName(), new CliManagerImpl(channel));
}
} else {
duplexChannels.get(uuid).upload(req,rsp);
}
} catch (InterruptedException e) {
throw new IOException(e);
};
}
}
}
......
......@@ -71,6 +71,8 @@ import java.util.Locale;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
/**
* Base class for Hudson CLI.
......@@ -159,6 +161,11 @@ public abstract class CLICommand implements ExtensionPoint, Cloneable {
*/
public transient Locale locale;
/**
* The encoding of the client, if defined.
*/
private transient @CheckForNull Charset encoding;
/**
* Set by the caller of the CLI system if the transport already provides
* authentication. Due to the compatibility issue, we still allow the user
......@@ -482,7 +489,18 @@ public abstract class CLICommand implements ExtensionPoint, Cloneable {
private static final long serialVersionUID = 1L;
}
protected Charset getClientCharset() throws IOException, InterruptedException {
/**
* Define the encoding for the command.
* @since FIXME
*/
public void setClientCharset(@Nonnull Charset encoding) {
this.encoding = encoding;
}
protected @Nonnull Charset getClientCharset() throws IOException, InterruptedException {
if (encoding != null) {
return encoding;
}
if (channel==null)
// for SSH, assume the platform default encoding
// this is in-line with the standard SSH behavior
......
......@@ -23,142 +23,67 @@
*/
package hudson.model;
import jenkins.util.SystemProperties;
import hudson.remoting.Channel;
import hudson.remoting.PingThread;
import hudson.remoting.Channel.Mode;
import hudson.util.ChunkedOutputStream;
import hudson.util.ChunkedInputStream;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import org.kohsuke.stapler.StaplerRequest;
import org.kohsuke.stapler.StaplerResponse;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.util.FullDuplexHttpService;
/**
* Builds a {@link Channel} on top of two HTTP streams (one used for each direction.)
*
* @author Kohsuke Kawaguchi
*/
abstract public class FullDuplexHttpChannel {
abstract public class FullDuplexHttpChannel extends FullDuplexHttpService {
private Channel channel;
private InputStream upload;
private final UUID uuid;
private final boolean restricted;
private boolean completed;
public FullDuplexHttpChannel(UUID uuid, boolean restricted) throws IOException {
this.uuid = uuid;
super(uuid);
this.restricted = restricted;
}
/**
* This is where we send the data to the client.
*
* <p>
* If this connection is lost, we'll abort the channel.
*/
public synchronized void download(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
// server->client channel.
// this is created first, and this controls the lifespan of the channel
rsp.addHeader("Transfer-Encoding", "chunked");
OutputStream out = rsp.getOutputStream();
if (DIY_CHUNKING) out = new ChunkedOutputStream(out);
// send something out so that the client will see the HTTP headers
out.write("Starting HTTP duplex channel".getBytes());
out.flush();
{// wait until we have the other channel
long end = System.currentTimeMillis() + CONNECTION_TIMEOUT;
while (upload == null && System.currentTimeMillis()<end)
wait(1000);
if (upload==null)
throw new IOException("HTTP full-duplex channel timeout: "+uuid);
}
try {
channel = new Channel("HTTP full-duplex channel " + uuid,
Computer.threadPoolForRemoting, Mode.BINARY, upload, out, null, restricted);
// so that we can detect dead clients, periodically send something
PingThread ping = new PingThread(channel) {
@Override
protected void onDead(Throwable diagnosis) {
LOGGER.log(Level.INFO,"Duplex-HTTP session " + uuid + " is terminated",diagnosis);
// this will cause the channel to abort and subsequently clean up
try {
upload.close();
} catch (IOException e) {
// this can never happen
throw new AssertionError(e);
}
}
@Override
protected void onDead() {
onDead(null);
@Override
protected void run(final InputStream upload, OutputStream download) throws IOException, InterruptedException {
channel = new Channel("HTTP full-duplex channel " + uuid,
Computer.threadPoolForRemoting, Mode.BINARY, upload, download, null, restricted);
// so that we can detect dead clients, periodically send something
PingThread ping = new PingThread(channel) {
@Override
protected void onDead(Throwable diagnosis) {
LOGGER.log(Level.INFO, "Duplex-HTTP session " + uuid + " is terminated", diagnosis);
// this will cause the channel to abort and subsequently clean up
try {
upload.close();
} catch (IOException e) {
// this can never happen
throw new AssertionError(e);
}
};
ping.start();
main(channel);
channel.join();
ping.interrupt();
} finally {
// publish that we are done
completed=true;
notify();
}
}
@Override
protected void onDead() {
onDead(null);
}
};
ping.start();
main(channel);
channel.join();
ping.interrupt();
}
protected abstract void main(Channel channel) throws IOException, InterruptedException;
/**
* This is where we receive inputs from the client.
*/
public synchronized void upload(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
InputStream in = req.getInputStream();
if(DIY_CHUNKING) in = new ChunkedInputStream(in);
// publish the upload channel
upload = in;
notify();
// wait until we are done
while (!completed)
wait();
}
public Channel getChannel() {
return channel;
}
private static final Logger LOGGER = Logger.getLogger(FullDuplexHttpChannel.class.getName());
/**
* Set to true if the servlet container doesn't support chunked encoding.
*/
@Restricted(NoExternalUse.class)
public static boolean DIY_CHUNKING = SystemProperties.getBoolean("hudson.diyChunking");
/**
* Controls the time out of waiting for the 2nd HTTP request to arrive.
*/
@Restricted(NoExternalUse.class)
public static long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
}
/*
* The MIT License
*
* Copyright 2017 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package jenkins.util;
import hudson.cli.FullDuplexHttpStream;
import hudson.util.ChunkedInputStream;
import hudson.util.ChunkedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import org.kohsuke.stapler.HttpResponses;
import org.kohsuke.stapler.StaplerRequest;
import org.kohsuke.stapler.StaplerResponse;
/**
* Server-side counterpart to {@link FullDuplexHttpStream}.
* @since FIXME
*/
public abstract class FullDuplexHttpService {
/**
* Set to true if the servlet container doesn't support chunked encoding.
*/
@Restricted(NoExternalUse.class)
public static boolean DIY_CHUNKING = SystemProperties.getBoolean("hudson.diyChunking");
/**
* Controls the time out of waiting for the 2nd HTTP request to arrive.
*/
@Restricted(NoExternalUse.class)
public static long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
protected final UUID uuid;
private InputStream upload;
private boolean completed;
protected FullDuplexHttpService(UUID uuid) {
this.uuid = uuid;
}
/**
* This is where we send the data to the client.
*
* <p>
* If this connection is lost, we'll abort the channel.
*/
public synchronized void download(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
// server->client channel.
// this is created first, and this controls the lifespan of the channel
rsp.addHeader("Transfer-Encoding", "chunked");
OutputStream out = rsp.getOutputStream();
if (DIY_CHUNKING) {
out = new ChunkedOutputStream(out);
}
// send something out so that the client will see the HTTP headers
out.write(0);
out.flush();
{// wait until we have the other channel
long end = System.currentTimeMillis() + CONNECTION_TIMEOUT;
while (upload == null && System.currentTimeMillis() < end) {
wait(1000);
}
if (upload == null) {
throw new IOException("HTTP full-duplex channel timeout: " + uuid);
}
}
try {
run(upload, out);
} finally {
// publish that we are done
completed = true;
notify();
}
}
protected abstract void run(InputStream upload, OutputStream download) throws IOException, InterruptedException;
/**
* This is where we receive inputs from the client.
*/
public synchronized void upload(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
InputStream in = req.getInputStream();
if (DIY_CHUNKING) {
in = new ChunkedInputStream(in);
}
// publish the upload channel
upload = in;
notify();
// wait until we are done
while (!completed) {
wait(); // TODO this can wait indefinitely even after the connection is broken
}
}
/**
* HTTP response that allows a client to use this service.
*/
public static abstract class Response extends HttpResponses.HttpResponseException {
private final Map<UUID, FullDuplexHttpService> services;
/**
* @param services a cross-request cache of services, to correlate the
* upload and download connections
*/
protected Response(Map<UUID, FullDuplexHttpService> services) {
this.services = services;
}
@Override
public void generateResponse(StaplerRequest req, StaplerResponse rsp, Object node) throws IOException, ServletException {
try {
// do not require any permission to establish a CLI connection
// the actual authentication for the connecting Channel is done by CLICommand
UUID uuid = UUID.fromString(req.getHeader("Session"));
rsp.setHeader("Hudson-Duplex", ""); // set the header so that the client would know
if (req.getHeader("Side").equals("download")) {
FullDuplexHttpService service = createService(req, uuid);
services.put(uuid, service);
try {
service.download(req, rsp);
} finally {
services.remove(uuid);
}
} else {
services.get(uuid).upload(req, rsp);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
protected abstract FullDuplexHttpService createService(StaplerRequest req, UUID uuid) throws IOException, InterruptedException;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册