提交 482f9a07 编写于 作者: K kohsuke

bug fixes. ObjectOutputStream needs to be flushed for certain kind of stream,...

bug fixes. ObjectOutputStream needs to be flushed for certain kind of stream, like process stream, or else we'll dead-lock.
expanded tests.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1306 71c3de6d-444a-0410-be80-ed276b4c234a
上级 470e4960
......@@ -6,6 +6,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.lang.reflect.Proxy;
import java.util.Hashtable;
import java.util.Map;
......@@ -43,6 +45,10 @@ public class Channel {
*/
private final ExportTable<Object> exportedObjects = new ExportTable<Object>();
public Channel(String name, Executor exec, InputStream is, OutputStream os) throws IOException {
this(name,exec,is,os,null);
}
/**
*
* @param name
......@@ -53,10 +59,45 @@ public class Channel {
* Stream connected to the remote peer.
* @param os
* Stream connected to the remote peer.
* @param header
* If non-null, receive the portion of data in <tt>is</tt> before
* the data goes into the "binary mode". This is useful
* when the established communication channel might include some data that might
* be useful for debugging/trouble-shooting.
*/
public Channel(String name, Executor exec, InputStream is, OutputStream os) throws IOException {
public Channel(String name, Executor exec, InputStream is, OutputStream os, OutputStream header) throws IOException {
this.executor = exec;
// write the magic preamble.
// certain communication channel, such as forking JVM via ssh,
// may produce some garbage at the beginning (for example a remote machine
// might print some warning before the program starts outputting its own data.)
//
// so use magic preamble and discard all the data up to that to improve robustness.
os.write(new byte[]{0,0,0,0}); // preamble
this.oos = new ObjectOutputStream(os);
oos.flush(); // make sure that stream header is sent to the other end. avoids dead-lock
{// read the input until we hit preamble
int ch;
int count=0;
while(true) {
ch = is.read();
if(ch==-1) {
throw new EOFException("unexpected stream termination");
}
if(ch==0) {
count++;
if(count==4) break;
} else {
if(header!=null)
header.write(ch);
count=0;
}
}
}
this.ois = new ObjectInputStream(is);
new ReaderThread(name).start();
}
......@@ -76,6 +117,7 @@ public class Channel {
Channel old = Channel.setCurrent(this);
try {
oos.writeObject(cmd);
oos.flush(); // make sure the command reaches the other end.
} finally {
Channel.setCurrent(old);
}
......
package hudson.remoting;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
......@@ -12,8 +14,16 @@ import java.util.concurrent.Executors;
*/
public class Launcher {
public static void main(String[] args) throws Exception {
Channel channel = new Channel("channel", Executors.newCachedThreadPool(), System.in, System.out);
// this will prevent programs from accidentally writing to System.out
// and messing up the stream.
OutputStream os = System.out;
System.setOut(System.err);
ExecutorService executor = Executors.newCachedThreadPool();
Channel channel = new Channel("channel", executor, System.in, os);
System.err.println("channel started");
channel.join();
System.err.println("channel stopped");
System.exit(0);
}
}
......@@ -19,7 +19,8 @@ interface ChannelRunner {
String getName();
Class<? extends ChannelRunner>[] LIST = new Class[] {
InProcess.class, Fork.class
InProcess.class,
Fork.class
};
......@@ -88,6 +89,7 @@ interface ChannelRunner {
public Channel start() throws Exception {
System.out.println("forking a new process");
// proc = Runtime.getRuntime().exec("java -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000 hudson.remoting.Launcher");
proc = Runtime.getRuntime().exec("java hudson.remoting.Launcher");
copier = new Copier("copier",proc.getErrorStream(),System.err);
......
package hudson.remoting;
import junit.framework.Test;
/**
* Test class image forwarding.
*
* @author Kohsuke Kawaguchi
*/
public class ClassRemotingTest extends RmiTestBase {
......@@ -14,4 +18,8 @@ public class ClassRemotingTest extends RmiTestBase {
assertTrue(r.toString().startsWith("hudson.remoting.RemoteClassLoader@"));
}
public static Test suite() throws Exception {
return buildSuite(ClassRemotingTest.class);
}
}
package hudson.remoting;
import junit.framework.Test;
import java.io.OutputStream;
import java.io.IOException;
import java.io.InputStream;
......@@ -7,6 +9,8 @@ import java.util.Arrays;
import java.util.concurrent.Future;
/**
* Test {@link Pipe}.
*
* @author Kohsuke Kawaguchi
*/
public class PipeTest extends RmiTestBase {
......@@ -94,4 +98,8 @@ public class PipeTest extends RmiTestBase {
assertEquals(-1,in.read());
in.close();
}
public static Test suite() throws Exception {
return buildSuite(PipeTest.class);
}
}
package hudson.remoting;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
import java.io.IOException;
import java.io.PipedInputStream;
......@@ -11,6 +13,8 @@ import java.util.concurrent.Executors;
import hudson.remoting.ChannelRunner.InProcess;
/**
* Base class for remoting tests.
*
* @author Kohsuke Kawaguchi
*/
public abstract class RmiTestBase extends TestCase {
......@@ -36,8 +40,20 @@ public abstract class RmiTestBase extends TestCase {
}
}
public String getName() {
return super.getName()+"-"+channelRunner.getName();
}
/**
* Can be used in the suite method of the derived class to build a
* {@link TestSuite} to run the test witih all the available
* {@link ChannelRunner} configuration.
*/
protected static Test buildSuite(Class<? extends RmiTestBase> testClass) {
TestSuite suite = new TestSuite();
for( Class<? extends ChannelRunner> r : ChannelRunner.LIST ) {
suite.addTest(new ChannelTestSuite(testClass,r));
}
return suite;
}
}
package hudson.remoting;
import junit.framework.Test;
import junit.framework.TestSuite;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Testing the basic features.
*
* @author Kohsuke Kawaguchi
*/
public class SimpleTest extends RmiTestBase {
......@@ -24,7 +25,7 @@ public class SimpleTest extends RmiTestBase {
private static class Callable1 implements Callable<Integer, RuntimeException> {
public Integer call() throws RuntimeException {
System.out.println("invoked");
System.err.println("invoked");
return 5;
}
}
......@@ -55,12 +56,7 @@ public class SimpleTest extends RmiTestBase {
}
}
public static Test suite() throws Exception {
TestSuite suite = new TestSuite();
for( Class<? extends ChannelRunner> r : ChannelRunner.LIST ) {
suite.addTest(new ChannelTestSuite(SimpleTest.class,r));
}
return suite;
return buildSuite(SimpleTest.class);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册