提交 b7304fa8 编写于 作者: A alanharder

Merge branch 'master' of github.com:jenkinsci/jenkins

......@@ -81,6 +81,9 @@ Upcoming changes</a>
(<a href="http://issues.jenkins-ci.org/browse/JENKINS-7275">issue 7275</a>)
<li class=rfe>
<tt>BuildWrapper</tt>s can now act on the build in progress before the checkout occurs.
<li class=rfe>
Improved the process forking abstractions so that plugins can more easily read from child processes.
(<a href="http://issues.jenkins-ci.org/browse/JENKINS-7809">issue 7809</a>)
</ul>
</div><!--=TRUNK-END=-->
......
/*
* The MIT License
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, Stephen Connolly
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, Stephen Connolly, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
......@@ -24,7 +24,6 @@
package hudson;
import hudson.Proc.LocalProc;
import hudson.Proc.RemoteProc;
import hudson.model.Computer;
import hudson.model.Hudson;
import hudson.model.TaskListener;
......@@ -44,7 +43,9 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.List;
......@@ -144,8 +145,19 @@ public abstract class Launcher {
protected boolean[] masks;
protected FilePath pwd;
protected OutputStream stdout = NULL_OUTPUT_STREAM, stderr;
protected InputStream stdin = new NullInputStream(0);
protected InputStream stdin = NULL_INPUT_STREAM;
protected String[] envs;
/**
* True to reverse the I/O direction.
*
* For example, if {@link #reverseStdout}==true, then we expose
* {@link InputStream} from {@link Proc} and expect the client to read from it,
* whereas normally we take {@link OutputStream} via {@link #stdout(OutputStream)}
* and feed stdout into that output.
*
* @since 1.399
*/
protected boolean reverseStdin, reverseStdout, reverseStderr;
public ProcStarter cmds(String... args) {
return cmds(Arrays.asList(args));
......@@ -266,6 +278,50 @@ public abstract class Launcher {
return envs;
}
/**
* Indicates that the caller will pump {@code stdout} from the child process
* via {@link Proc#getStdout()} (whereas by default you call {@link #stdout(OutputStream)}
* and let Jenkins pump stdout into your {@link OutputStream} of choosing.
*
* <p>
* When this method is called, {@link Proc#getStdout()} will read the combined output
* of {@code stdout/stderr} from the child process, unless {@link #readStderr()} is called
* separately, which lets the caller read those two streams separately.
*
* @since 1.399
*/
public ProcStarter readStdout() {
reverseStdout = true;
stdout = stderr = null;
return this;
}
/**
* In addition to the effect of {@link #readStdout()}, indicate that the caller will pump {@code stderr}
* from the child process separately from {@code stdout}. The stderr will be readable from
* {@link Proc#getStderr()} while {@link Proc#getStdout()} reads from stdout.
*
* @since 1.399
*/
public ProcStarter readStderr() {
reverseStdout = true;
reverseStderr = true;
return this;
}
/**
* Indicates that the caller will directly write to the child process {@code stin} }via
* {@link Proc#getStdin()} (whereas by default you call {@link #stdin(InputStream)}
* and let Jenkins pump your {@link InputStream} of choosing to stdin.
* @since 1.399
*/
public ProcStarter writeStdin() {
reverseStdin = true;
stdin = null;
return this;
}
/**
* Starts the new process as configured.
*/
......@@ -284,7 +340,11 @@ public abstract class Launcher {
* Copies a {@link ProcStarter}.
*/
public ProcStarter copy() {
return new ProcStarter().cmds(commands).pwd(pwd).masks(masks).stdin(stdin).stdout(stdout).stderr(stderr).envs(envs);
ProcStarter rhs = new ProcStarter().cmds(commands).pwd(pwd).masks(masks).stdin(stdin).stdout(stdout).stderr(stderr).envs(envs);
rhs.reverseStdin = this.reverseStdin;
rhs.reverseStderr = this.reverseStderr;
rhs.reverseStdout = this.reverseStdout;
return rhs;
}
}
......@@ -635,7 +695,11 @@ public abstract class Launcher {
for ( int idx = 0 ; idx < jobCmd.length; idx++ )
jobCmd[idx] = jobEnv.expand(ps.commands.get(idx));
return new LocalProc(jobCmd, Util.mapToEnv(jobEnv), ps.stdin, ps.stdout, ps.stderr, toFile(ps.pwd));
return new LocalProc(jobCmd, Util.mapToEnv(jobEnv),
ps.reverseStdin ?LocalProc.SELFPUMP_INPUT:ps.stdin,
ps.reverseStdout?LocalProc.SELFPUMP_OUTPUT:ps.stdout,
ps.reverseStderr?LocalProc.SELFPUMP_OUTPUT:ps.stderr,
toFile(ps.pwd));
}
private File toFile(FilePath f) {
......@@ -716,10 +780,14 @@ public abstract class Launcher {
public Proc launch(ProcStarter ps) throws IOException {
final OutputStream out = ps.stdout == null ? null : new RemoteOutputStream(new CloseProofOutputStream(ps.stdout));
final OutputStream err = ps.stderr==null ? null : new RemoteOutputStream(new CloseProofOutputStream(ps.stderr));
final InputStream in = ps.stdin==null ? null : new RemoteInputStream(ps.stdin);
final InputStream in = (ps.stdin==null || ps.stdin==NULL_INPUT_STREAM) ? null : new RemoteInputStream(ps.stdin,false);
final String workDir = ps.pwd==null ? null : ps.pwd.getRemote();
return new RemoteProc(getChannel().callAsync(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, out, err, workDir, listener)));
try {
return new ProcImpl(getChannel().call(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, ps.reverseStdin, out, ps.reverseStdout, err, ps.reverseStderr, workDir, listener)));
} catch (InterruptedException e) {
throw (IOException)new InterruptedIOException().initCause(e);
}
}
public Channel launchChannel(String[] cmd, OutputStream err, FilePath _workDir, Map<String,String> envOverrides) throws IOException, InterruptedException {
......@@ -762,9 +830,64 @@ public abstract class Launcher {
private static final long serialVersionUID = 1L;
}
public static final class ProcImpl extends Proc {
private final RemoteProcess process;
private final IOTriplet io;
public ProcImpl(RemoteProcess process) {
this.process = process;
this.io = process.getIOtriplet();
}
@Override
public void kill() throws IOException, InterruptedException {
process.kill();
}
@Override
public int join() throws IOException, InterruptedException {
return process.join();
}
@Override
public boolean isAlive() throws IOException, InterruptedException {
return process.isAlive();
}
@Override
public InputStream getStdout() {
return io.stdout;
}
@Override
public InputStream getStderr() {
return io.stderr;
}
@Override
public OutputStream getStdin() {
return io.stdin;
}
}
}
public static class IOTriplet implements Serializable {
InputStream stdout,stderr;
OutputStream stdin;
private static final long serialVersionUID = 1L;
}
/**
* Remoting interface of a remote process
*/
public static interface RemoteProcess {
int join() throws InterruptedException, IOException;
void kill() throws IOException, InterruptedException;
boolean isAlive() throws IOException, InterruptedException;
IOTriplet getIOtriplet();
}
private static class RemoteLaunchCallable implements Callable<Integer,IOException> {
private static class RemoteLaunchCallable implements Callable<RemoteProcess,IOException> {
private final List<String> cmd;
private final boolean[] masks;
private final String[] env;
......@@ -773,8 +896,9 @@ public abstract class Launcher {
private final OutputStream err;
private final String workDir;
private final TaskListener listener;
private final boolean reverseStdin, reverseStdout, reverseStderr;
RemoteLaunchCallable(List<String> cmd, boolean[] masks, String[] env, InputStream in, OutputStream out, OutputStream err, String workDir, TaskListener listener) {
RemoteLaunchCallable(List<String> cmd, boolean[] masks, String[] env, InputStream in, boolean reverseStdin, OutputStream out, boolean reverseStdout, OutputStream err, boolean reverseStderr, String workDir, TaskListener listener) {
this.cmd = new ArrayList<String>(cmd);
this.masks = masks;
this.env = env;
......@@ -783,26 +907,51 @@ public abstract class Launcher {
this.err = err;
this.workDir = workDir;
this.listener = listener;
this.reverseStdin = reverseStdin;
this.reverseStdout = reverseStdout;
this.reverseStderr = reverseStderr;
}
public Integer call() throws IOException {
public RemoteProcess call() throws IOException {
Launcher.ProcStarter ps = new LocalLauncher(listener).launch();
ps.cmds(cmd).masks(masks).envs(env).stdin(in).stdout(out).stderr(err);
if(workDir!=null) ps.pwd(workDir);
if (reverseStdin) ps.writeStdin();
if (reverseStdout) ps.readStdout();
if (reverseStderr) ps.readStderr();
Proc p = ps.start();
try {
return p.join();
} catch (InterruptedException e) {
return -1;
} finally {
// make sure I/O is delivered to the remote before we return
try {
Channel.current().syncIO();
} catch (Throwable _) {
// this includes a failure to sync, slave.jar too old, etc
final Proc p = ps.start();
return Channel.current().export(RemoteProcess.class,new RemoteProcess() {
public int join() throws InterruptedException, IOException {
try {
return p.join();
} finally {
// make sure I/O is delivered to the remote before we return
try {
Channel.current().syncIO();
} catch (Throwable _) {
// this includes a failure to sync, slave.jar too old, etc
}
}
}
}
public void kill() throws IOException, InterruptedException {
p.kill();
}
public boolean isAlive() throws IOException, InterruptedException {
return p.isAlive();
}
public IOTriplet getIOtriplet() {
IOTriplet r = new IOTriplet();
if (reverseStdout) r.stdout = new RemoteInputStream(p.getStdout());
if (reverseStderr) r.stderr = new RemoteInputStream(p.getStderr());
if (reverseStdin) r.stdin = new RemoteOutputStream(p.getStdin());
return r;
}
});
}
private static final long serialVersionUID = 1L;
......@@ -873,5 +1022,7 @@ public abstract class Launcher {
*/
public static boolean showFullPath = false;
private static final NullInputStream NULL_INPUT_STREAM = new NullInputStream(0);
private static final Logger LOGGER = Logger.getLogger(Launcher.class.getName());
}
/*
* The MIT License
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
......@@ -23,11 +23,14 @@
*/
package hudson;
import hudson.Launcher.ProcStarter;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.util.IOException2;
import hudson.util.NullStream;
import hudson.util.StreamCopyThread;
import hudson.util.ProcessTree;
import org.apache.commons.io.input.NullInputStream;
import java.io.File;
import java.io.IOException;
......@@ -71,7 +74,10 @@ public abstract class Proc {
public abstract void kill() throws IOException, InterruptedException;
/**
* Waits for the completion of the process and until we finish reading everything that the process has produced
* Waits for the completion of the process.
*
* Unless the caller opts to pump the streams via {@link #getStdout()} etc.,
* this method also blocks until we finish reading everything that the process has produced
* to stdout/stderr.
*
* <p>
......@@ -85,6 +91,45 @@ public abstract class Proc {
*/
public abstract int join() throws IOException, InterruptedException;
/**
* Returns an {@link InputStream} to read from {@code stdout} of the child process.
* <p>
* When this method returns null, {@link Proc} will internally pump the output from
* the child process to your {@link OutputStream} of choosing.
*
* @return
* null unless {@link ProcStarter#readStdout()} is used to indicate
* that the caller intends to pump the stream by itself.
* @since 1.399
*/
public abstract InputStream getStdout();
/**
* Returns an {@link InputStream} to read from {@code stderr} of the child process.
* <p>
* When this method returns null, {@link Proc} will internally pump the output from
* the child process to your {@link OutputStream} of choosing.
*
* @return
* null unless {@link ProcStarter#readStderr()} is used to indicate
* that the caller intends to pump the stream by itself.
* @since 1.399
*/
public abstract InputStream getStderr();
/**
* Returns an {@link OutputStream} to write to {@code stdin} of the child process.
* <p>
* When this method returns null, {@link Proc} will internally pump the {@link InputStream}
* of your choosing to the child process.
*
* @return
* null unless {@link ProcStarter#writeStdin()} is used to indicate
* that the caller intends to pump the stream by itself.
* @since 1.399
*/
public abstract OutputStream getStdin();
private static final ExecutorService executor = Executors.newCachedThreadPool();
/**
* Like {@link #join} but can be given a maximum time to wait.
......@@ -133,6 +178,9 @@ public abstract class Proc {
private final EnvVars cookie;
private final String name;
private final InputStream stdout,stderr;
private final OutputStream stdin;
public LocalProc(String cmd, Map<String,String> env, OutputStream out, File workDir) throws IOException {
this(cmd,Util.mapToEnv(env),out,workDir);
}
......@@ -163,12 +211,12 @@ public abstract class Proc {
*/
public LocalProc(String[] cmd,String[] env,InputStream in,OutputStream out,OutputStream err,File workDir) throws IOException {
this( calcName(cmd),
stderr(environment(new ProcessBuilder(cmd),env).directory(workDir),err),
stderr(environment(new ProcessBuilder(cmd),env).directory(workDir), err==null || err== SELFPUMP_OUTPUT),
in, out, err );
}
private static ProcessBuilder stderr(ProcessBuilder pb, OutputStream stderr) {
if(stderr==null) pb.redirectErrorStream(true);
private static ProcessBuilder stderr(ProcessBuilder pb, boolean redirectError) {
if(redirectError) pb.redirectErrorStream(true);
return pb;
}
......@@ -191,17 +239,39 @@ public abstract class Proc {
this.cookie = EnvVars.createCookie();
procBuilder.environment().putAll(cookie);
this.proc = procBuilder.start();
InputStream procInputStream = proc.getInputStream();
copier = new StreamCopyThread(name+": stdout copier", procInputStream, out);
copier.start();
if(in!=null)
new StdinCopyThread(name+": stdin copier",in,proc.getOutputStream()).start();
else
if (out==SELFPUMP_OUTPUT) {
stdout = procInputStream;
copier = null;
} else {
copier = new StreamCopyThread(name+": stdout copier", procInputStream, out);
copier.start();
stdout = null;
}
if (in == null) {
// nothing to feed to stdin
stdin = null;
proc.getOutputStream().close();
} else
if (in==SELFPUMP_INPUT) {
stdin = proc.getOutputStream();
} else {
new StdinCopyThread(name+": stdin copier",in,proc.getOutputStream()).start();
stdin = null;
}
InputStream procErrorStream = proc.getErrorStream();
if(err!=null) {
copier2 = new StreamCopyThread(name+": stderr copier", procErrorStream, err);
copier2.start();
if (err==SELFPUMP_OUTPUT) {
stderr = procErrorStream;
copier2 = null;
} else {
stderr = null;
copier2 = new StreamCopyThread(name+": stderr copier", procErrorStream, err);
copier2.start();
}
} else {
// the javadoc is unclear about what getErrorStream() returns when ProcessBuilder.redirectErrorStream(true),
//
......@@ -212,9 +282,22 @@ public abstract class Proc {
procErrorStream.close();
}
copier2 = null;
stderr = null;
}
}
public InputStream getStdout() {
return stdout;
}
public InputStream getStderr() {
return stderr;
}
public OutputStream getStdin() {
return stdin;
}
/**
* Waits for the completion of the process.
*/
......@@ -234,9 +317,9 @@ public abstract class Proc {
// see http://wiki.jenkins-ci.org/display/JENKINS/Spawning+processes+from+build
// problems like that shows up as infinite wait in join(), which confuses great many users.
// So let's do a timed wait here and try to diagnose the problem
copier.join(10*1000);
if (copier!=null) copier.join(10*1000);
if(copier2!=null) copier2.join(10*1000);
if(copier.isAlive() || (copier2!=null && copier2.isAlive())) {
if((copier!=null && copier.isAlive()) || (copier2!=null && copier2.isAlive())) {
// looks like handles are leaking.
// closing these handles should terminate the threads.
String msg = "Process leaked file descriptors. See http://wiki.jenkins-ci.org/display/JENKINS/Spawning+processes+from+build for more information";
......@@ -335,10 +418,15 @@ public abstract class Proc {
}
return buf.toString();
}
public static final InputStream SELFPUMP_INPUT = new NullInputStream(0);
public static final OutputStream SELFPUMP_OUTPUT = new NullStream();
}
/**
* Remotely launched process via {@link Channel}.
*
* @deprecated as of 1.399
*/
public static final class RemoteProc extends Proc {
private final Future<Integer> process;
......@@ -373,6 +461,21 @@ public abstract class Proc {
public boolean isAlive() throws IOException, InterruptedException {
return !process.isDone();
}
@Override
public InputStream getStdout() {
return null;
}
@Override
public InputStream getStderr() {
return null;
}
@Override
public OutputStream getStdin() {
return null;
}
}
private static final Logger LOGGER = Logger.getLogger(Proc.class.getName());
......
......@@ -14,6 +14,7 @@ import org.jvnet.hudson.remcom.WindowsRemoteProcessLauncher;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
......@@ -89,6 +90,21 @@ public class WindowsRemoteLauncher extends Launcher {
proc.destroy();
}
}
@Override
public InputStream getStdout() {
throw new UnsupportedOperationException();
}
@Override
public InputStream getStderr() {
throw new UnsupportedOperationException();
}
@Override
public OutputStream getStdin() {
throw new UnsupportedOperationException();
}
};
}
......
......@@ -155,11 +155,18 @@ public abstract class ProcessTree implements Iterable<OSProcess>, IProcessTree,
/*package*/ final List<ProcessKiller> getKillers() throws InterruptedException {
if (killers==null)
try {
killers = SlaveComputer.getChannelToMaster().call(new Callable<List<ProcessKiller>, IOException>() {
public List<ProcessKiller> call() throws IOException {
return new ArrayList<ProcessKiller>(ProcessKiller.all());
}
});
VirtualChannel channelToMaster = SlaveComputer.getChannelToMaster();
if (channelToMaster!=null) {
killers = channelToMaster.call(new Callable<List<ProcessKiller>, IOException>() {
public List<ProcessKiller> call() throws IOException {
return new ArrayList<ProcessKiller>(ProcessKiller.all());
}
});
} else {
// used in an environment that doesn't support talk-back to the master.
// let's do with what we have.
killers = Collections.emptyList();
}
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failed to obtain killers",e);
killers = Collections.emptyList();
......
......@@ -28,11 +28,12 @@ import hudson.util.ProcessTree;
import hudson.util.StreamTaskListener;
import hudson.remoting.Callable;
import org.apache.commons.io.FileUtils;
import org.jvnet.hudson.test.Bug;
import java.io.File;
public class LauncherTest extends ChannelTestCase {
//@Bug(4611)
@Bug(4611)
public void testRemoteKill() throws Exception {
if (File.pathSeparatorChar != ':') {
System.err.println("Skipping, currently Unix-specific test");
......
......@@ -39,13 +39,28 @@ import java.io.ObjectInputStream;
*/
public class RemoteInputStream extends InputStream implements Serializable {
private transient InputStream core;
private boolean autoUnexport;
/**
* Short for {@code RemoteInputStream(core,true)}.
*/
public RemoteInputStream(InputStream core) {
this(core,true);
}
/**
* @param autoUnexport
* If true, the {@link InputStream} will be automatically unexported when
* the callable that took it with returns. If false, it'll not unexported
* until the close method is called.
*/
public RemoteInputStream(InputStream core, boolean autoUnexport) {
this.core = core;
this.autoUnexport = autoUnexport;
}
private void writeObject(ObjectOutputStream oos) throws IOException {
int id = Channel.current().export(core);
int id = Channel.current().export(core,autoUnexport);
oos.writeInt(id);
}
......
......@@ -4,6 +4,8 @@ import hudson.Launcher.ProcStarter;
import hudson.Proc;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Fake a process launch.
......@@ -47,5 +49,23 @@ public interface FakeLauncher {
public int join() throws IOException, InterruptedException {
return exitCode;
}
@Override
public InputStream getStdout() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public InputStream getStderr() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public OutputStream getStdin() {
// TODO
throw new UnsupportedOperationException();
}
}
}
package hudson;
import hudson.Launcher.LocalLauncher;
import hudson.Launcher.RemoteLauncher;
import hudson.Proc.RemoteProc;
import hudson.remoting.Callable;
......@@ -12,6 +13,7 @@ import hudson.util.StreamTaskListener;
import org.jvnet.hudson.test.Bug;
import org.jvnet.hudson.test.HudsonTestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
......@@ -26,13 +28,7 @@ public class ProcTest extends HudsonTestCase {
*/
@Bug(7809)
public void testRemoteProcOutputSync() throws Exception {
DumbSlave s = createSlave();
s.toComputer().connect(false).get();
VirtualChannel ch=null;
while (ch==null) {
ch = s.toComputer().getChannel();
Thread.sleep(100);
}
VirtualChannel ch = createSlaveChannel();
// keep the pipe fairly busy
final Pipe p = Pipe.createRemoteToLocal();
......@@ -63,6 +59,17 @@ public class ProcTest extends HudsonTestCase {
ch.close();
}
private VirtualChannel createSlaveChannel() throws Exception {
DumbSlave s = createSlave();
s.toComputer().connect(false).get();
VirtualChannel ch=null;
while (ch==null) {
ch = s.toComputer().getChannel();
Thread.sleep(100);
}
return ch;
}
private static class ChannelFiller implements Callable<Void,IOException> {
private final OutputStream o;
......@@ -77,4 +84,38 @@ public class ProcTest extends HudsonTestCase {
return null;
}
}
@Bug(7809)
public void testIoPumpingWithLocalLaunch() throws Exception {
doIoPumpingTest(new LocalLauncher(new StreamTaskListener(System.out, Charset.defaultCharset())));
}
@Bug(7809)
public void testIoPumpingWithRemoteLaunch() throws Exception {
doIoPumpingTest(new RemoteLauncher(
new StreamTaskListener(System.out, Charset.defaultCharset()),
createSlaveChannel(), true));
}
private void doIoPumpingTest(Launcher l) throws IOException, InterruptedException {
String[] ECHO_BACK_CMD = {"cat"}; // TODO: what is the echo back command for Windows? "cmd /C copy CON CON"?
ByteArrayOutputStream out = new ByteArrayOutputStream();
l.launch().cmds(ECHO_BACK_CMD).stdin(new ByteArrayInputStream("Hello".getBytes())).stdout(out).join();
assertEquals("Hello",out.toString());
Proc p = l.launch().cmds(ECHO_BACK_CMD).stdin(new ByteArrayInputStream("Hello".getBytes())).readStdout().start();
p.join();
assertEquals("Hello", org.apache.commons.io.IOUtils.toString(p.getStdout()));
assertNull(p.getStderr());
assertNull(p.getStdin());
p = l.launch().cmds(ECHO_BACK_CMD).writeStdin().readStdout().start();
p.getStdin().write("Hello".getBytes());
p.getStdin().close();
p.join();
assertEquals("Hello", org.apache.commons.io.IOUtils.toString(p.getStdout()));
assertNull(p.getStderr());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册