提交 f263d849 编写于 作者: K kohsuke

Fixed a race condition that appear as test failures. This is caused by the...

Fixed a race condition that appear as test failures. This is caused by the remote peer sending us a request before we get to obtain the remote capacity.
I moved the capability exchange to the preamble negotiation to avoid this problem.

----
 java.lang.NullPointerException
	at hudson.remoting.UserRequest._serialize(UserRequest.java:136)
	at hudson.remoting.UserRequest.perform(UserRequest.java:118)
	at hudson.remoting.UserRequest.perform(UserRequest.java:48)
	at hudson.remoting.Request$2.run(Request.java:236)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@21132 71c3de6d-444a-0410-be80-ed276b4c234a
上级 40b844c8
package hudson.remoting;
import hudson.remoting.Channel.Mode;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.IOException;
/**
* Represents additional features implemented on {@link Channel}.
......@@ -38,5 +44,25 @@ final class Capability implements Serializable {
return (mask&1)!=0;
}
/**
* Writes out the capacity preamble.
*/
void writePreamble(OutputStream os) throws IOException {
os.write(PREAMBLE);
ObjectOutputStream oos = new ObjectOutputStream(Mode.TEXT.wrap(os));
oos.writeObject(this);
oos.flush();
}
private static final long serialVersionUID = 1L;
static final byte[] PREAMBLE;
static {
try {
PREAMBLE = "\n<===[HUDSON REMOTING CAPACITY]===>".getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError(e);
}
}
}
......@@ -295,14 +295,15 @@ public class Channel implements VirtualChannel, IChannel {
throw new AssertionError(); // export number 1 is reserved for the channel itself
remoteChannel = RemoteInvocationHandler.wrap(this,1,IChannel.class,false,false);
properties.put(Capability.class,new Capability()); // export our capability
// write the magic preamble.
// certain communication channel, such as forking JVM via ssh,
// may produce some garbage at the beginning (for example a remote machine
// might print some warning before the program starts outputting its own data.)
//
// so use magic preamble and discard all the data up to that to improve robustness.
new Capability().writePreamble(os);
ObjectOutputStream oos = null;
if(mode!= Mode.NEGOTIATE) {
os.write(mode.preamble);
......@@ -311,39 +312,51 @@ public class Channel implements VirtualChannel, IChannel {
}
{// read the input until we hit preamble
int[] ptr=new int[2];
Mode[] modes={Mode.BINARY,Mode.TEXT};
byte[][] preambles = new byte[][]{Mode.BINARY.preamble, Mode.TEXT.preamble, Capability.PREAMBLE};
int[] ptr=new int[3];
Capability cap = new Capability(0); // remote capacity that we obtained. If we don't hear from remote, assume no capability
while(true) {
int ch = is.read();
if(ch==-1)
throw new EOFException("unexpected stream termination");
for(int i=0;i<2;i++) {
byte[] preamble = modes[i].preamble;
for(int i=0;i<preambles.length;i++) {
byte[] preamble = preambles[i];
if(preamble[ptr[i]]==ch) {
if(++ptr[i]==preamble.length) {
// found preamble
if(mode==Mode.NEGOTIATE) {
// now we know what the other side wants, so send the consistent preamble
mode = modes[i];
os.write(mode.preamble);
oos = new ObjectOutputStream(mode.wrap(os));
oos.flush();
} else {
if(modes[i]!=mode)
throw new IOException("Protocol negotiation failure");
switch (i) {
case 0:
case 1:
// transmission mode negotiation
if(mode==Mode.NEGOTIATE) {
// now we know what the other side wants, so send the consistent preamble
mode = modes[i];
os.write(mode.preamble);
oos = new ObjectOutputStream(mode.wrap(os));
oos.flush();
} else {
if(modes[i]!=mode)
throw new IOException("Protocol negotiation failure");
}
this.oos = oos;
this.ois = new ObjectInputStream(mode.wrap(is));
new ReaderThread(name).start();
this.remoteCapability = cap;
return;
case 2:
try {// capability negotiation
ObjectInputStream ois = new ObjectInputStream(Mode.TEXT.wrap(is));
cap = (Capability)ois.readObject();
} catch (ClassNotFoundException e) {
throw (Error)new NoClassDefFoundError(e.getMessage()).initCause(e);
}
}
this.oos = oos;
this.ois = new ObjectInputStream(mode.wrap(is));
new ReaderThread(name).start();
Capability rc = (Capability) getRemoteProperty(Capability.class);
if (rc==null) rc = new Capability(0); // assume no capability
this.remoteCapability = rc;
return;
ptr[i]=0; // reset
}
} else {
// didn't match.
......
package hudson.remoting;
/**
* @author Kohsuke Kawaguchi
*/
public class ChannelTest extends RmiTestBase {
public void testCapability() {
assertTrue(channel.remoteCapability.supportsMultiClassLoaderRPC());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册