提交 7926a204 编写于 作者: K kohsuke

added a CLI entry point for Hudson over HTTP by adding full-duplex byte stream...

added a CLI entry point for Hudson over HTTP by adding full-duplex byte stream over HTTP. Note that this ties two threads on the server side, so it's not meant for a long-running sessions

git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@17544 71c3de6d-444a-0410-be80-ed276b4c234a
上级 9c617037
<?xml version="1.0" encoding="UTF-8"?>
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pom</artifactId>
<groupId>org.jvnet.hudson.main</groupId>
<version>1.302-SNAPSHOT</version>
</parent>
<groupId>org.jvnet.hudson.main</groupId>
<artifactId>cli</artifactId>
<name>Hudson CLI</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>remoting</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
</project>
package hudson.cli;
import hudson.remoting.Callable;
import hudson.remoting.Channel;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.lang.reflect.Method;
/**
* @author Kohsuke Kawaguchi
*/
public class CLI {
public static void main(final String[] args) throws Exception {
URL target = new URL("http://localhost:8080/duplexChannel");
ChunkedHttpStreamPair con = new ChunkedHttpStreamPair(target);
ExecutorService pool = Executors.newCachedThreadPool();
Channel channel = new Channel("Chunked connection to "+target,
pool,con.getInputStream(),con.getOutputStream());
// execute the command
int r=-1;
try {
r = channel.call(new Callable<Integer,Exception>() {
public Integer call() throws Exception {
Method m = Class.forName("hudson.model.Hudson").getMethod("cli", String[].class);
return (Integer)m.invoke(null,new Object[]{args});
}
});
} finally {
channel.close();
pool.shutdown();
}
System.exit(r);
}
}
package hudson.cli;
import hudson.cli.SequenceOutputStream.Block;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.UUID;
/**
* Creates a capacity-unlimited bi-directional {@link InputStream}/{@link OutputStream} pair over
* HTTP, which is a request/response protocol.
*
* @author Kohsuke Kawaguchi
*/
public class ChunkedHttpStreamPair {
private final URL target;
/**
* Uniquely identifies this connection, so that the server can bundle separate HTTP requests together.
*/
private final UUID uuid = UUID.randomUUID();
private final OutputStream output;
private final InputStream input;
public InputStream getInputStream() {
return input;
}
public OutputStream getOutputStream() {
return output;
}
public ChunkedHttpStreamPair(URL target) throws IOException {
this.target = target;
// server->client side is very simple
HttpURLConnection con = (HttpURLConnection) target.openConnection();
con.setDoOutput(true); // request POST
con.setRequestMethod("POST");
con.addRequestProperty("Session",uuid.toString());
con.addRequestProperty("Side","download");
con.getOutputStream().close();
input = con.getInputStream();
// client->server needs to be chopped up into blocks since URLConnection
// doesn't allow POST without Content-Length
// output = new SequenceOutputStream(new HttpBlock(makeConnection())) {
// protected Block next(Block current) throws IOException {
// // wait for the server to finish the response
// // before initiating the next connection. This guarantees
// // that the uploaded data is handled in-order by the server
// ((HttpBlock)current).close();
//
// return new HttpBlock(makeConnection());
// }
// };
// chunked encoding
con = (HttpURLConnection) target.openConnection();
con.setDoOutput(true); // request POST
con.setRequestMethod("POST");
con.setChunkedStreamingMode(0);
con.addRequestProperty("Session",uuid.toString());
con.addRequestProperty("Side","upload");
output = con.getOutputStream();
}
private class HttpBlock extends Block {
private final HttpURLConnection con;
private HttpBlock(HttpURLConnection con) throws IOException {
super(new FilterOutputStream(con.getOutputStream()) {
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
}, BLOCK_SIZE);
this.con = con;
}
void close() throws IOException {
con.getInputStream().read();
}
}
private HttpURLConnection makeConnection() throws IOException {
HttpURLConnection con = (HttpURLConnection) target.openConnection();
con.setDoOutput(true); // request POST
con.setRequestMethod("POST");
con.addRequestProperty("User-Agent","Hudson");
con.addRequestProperty("Session",uuid.toString());
con.addRequestProperty("Side","upload");
con.setFixedLengthStreamingMode(BLOCK_SIZE);
con.connect();
return con;
}
static final int BLOCK_SIZE = 1024;
}
package hudson.cli;
import java.io.OutputStream;
import java.io.IOException;
import java.io.SequenceInputStream;
/**
* {@link OutputStream} version of {@link SequenceInputStream}.
*
* Provides a single {@link OutputStream} view over multiple {@link OutputStream}s (each of the fixed length.)
*
* @author Kohsuke Kawaguchi
*/
abstract class SequenceOutputStream extends OutputStream {
protected static class Block {
final OutputStream out;
long capacity;
public Block(OutputStream out, long capacity) {
this.out = out;
this.capacity = capacity;
}
}
/**
* Current block being written.
*/
private Block block;
protected SequenceOutputStream(Block block) {
this.block = block;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
while(len>0) {
int sz = (int)Math.min(len, block.capacity);
block.out.write(b,off,sz);
block.capacity -=sz;
len-=sz;
off+=sz;
swapIfNeeded();
}
}
public void write(int b) throws IOException {
block.out.write(b);
block.capacity--;
swapIfNeeded();
}
private void swapIfNeeded() throws IOException {
if(block.capacity >0) return;
block.out.close();
block=next(block);
}
@Override
public void flush() throws IOException {
block.out.flush();
}
@Override
public void close() throws IOException {
block.out.close();
block=null;
}
/**
* Fetches the next {@link OutputStream} to write to,
* along with their capacity.
*/
protected abstract Block next(Block current) throws IOException;
}
package hudson.model;
import hudson.remoting.Channel;
import hudson.remoting.PingThread;
import org.apache.commons.io.IOUtils;
import org.kohsuke.stapler.StaplerRequest;
import org.kohsuke.stapler.StaplerResponse;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.UUID;
import java.util.logging.Logger;
/**
* Builds a {@link Channel} on top of two HTTP streams (one used for each direction.)
*
* @author Kohsuke Kawaguchi
*/
final class FullDuplexHttpChannel {
private Channel channel;
private final PipedOutputStream pipe = new PipedOutputStream();
private final UUID uuid;
public FullDuplexHttpChannel(UUID uuid) throws IOException {
this.uuid = uuid;
}
/**
* This is where we send the data to the client.
*
* <p>
* If this connection is lost, we'll abort the channel.
*/
public void download(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
// server->client channel.
// this is created first, and this controls the lifespan of the channel
rsp.addHeader("Transfer-Encoding", "chunked");
channel = new Channel("HTTP full-duplex channel " + uuid,
Computer.threadPoolForRemoting, new PipedInputStream(pipe), rsp.getOutputStream());
// so that we can detect dead clients, periodically send something
PingThread ping = new PingThread(channel) {
@Override
protected void onDead() {
LOGGER.info("Duplex-HTTP session " + uuid + " is terminated");
// this will cause the channel to abort and subsequently clean up
try {
pipe.close();
} catch (IOException e) {
// this can never happen
throw new AssertionError(e);
}
}
};
ping.start();
channel.join();
ping.interrupt();
}
/**
* This is where we receive inputs from the client.
*/
public void upload(StaplerRequest req, StaplerResponse rsp) throws InterruptedException, IOException {
rsp.setStatus(HttpServletResponse.SC_OK);
IOUtils.copy(req.getInputStream(),pipe);
}
public Channel getChannel() {
return channel;
}
private static final Logger LOGGER = Logger.getLogger(FullDuplexHttpChannel.class.getName());
}
......@@ -53,6 +53,7 @@ import hudson.model.listeners.JobListener.JobListenerAdapter;
import hudson.model.listeners.SCMListener;
import hudson.remoting.LocalChannel;
import hudson.remoting.VirtualChannel;
import hudson.remoting.Channel;
import hudson.scm.CVSSCM;
import hudson.scm.RepositoryBrowser;
import hudson.scm.SCM;
......@@ -168,6 +169,7 @@ import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TreeSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
......@@ -2654,6 +2656,37 @@ public final class Hudson extends Node implements ItemGroup<TopLevelItem>, Stapl
rsp.getWriter().println("GCed");
}
private transient final Map<UUID,FullDuplexHttpChannel> duplexChannels = new HashMap<UUID, FullDuplexHttpChannel>();
/**
* Handles HTTP requests for duplex channels.
*/
public void doDuplexChannel(StaplerRequest req, StaplerResponse rsp) throws IOException, ServletException, InterruptedException {
requirePOST();
UUID uuid = UUID.fromString(req.getHeader("Session"));
FullDuplexHttpChannel server;
if(req.getHeader("Side").equals("download")) {
duplexChannels.put(uuid,server=new FullDuplexHttpChannel(uuid));
try {
server.download(req,rsp);
} finally {
duplexChannels.remove(uuid);
}
} else {
duplexChannels.get(uuid).upload(req,rsp);
}
}
/**
* Called by the CLI over a {@link Channel} to execute an CLI command.
*/
public static int cli(String... args) {
System.out.println(Arrays.asList(args));
return 0;
}
/**
* Binds /userContent/... to $HUDSON_HOME/userContent.
*/
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
The MIT License
......@@ -21,7 +22,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
......@@ -47,6 +47,7 @@ THE SOFTWARE.
<module>maven-interceptor</module>
<module>war</module>
<module>test</module>
<module>cli</module>
</modules>
<scm>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册