提交 cf2eb25d 编写于 作者: K kohsuke

added preliminary version of the remoting infrastructure.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1294 71c3de6d-444a-0410-be80-ed276b4c234a
上级 d4d72e3c
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jvnet.hudson.main</groupId>
<artifactId>pom</artifactId>
<version>1.66-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>remoting</artifactId>
<packaging>jar</packaging>
<name>Remoting layer for Hudson</name>
<profiles>
<profile>
<id>debug</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<!-- run retrotranslator only during the release -->
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>retrotranslator-maven-plugin</artifactId>
<executions>
<execution>
<phase>process-classes</phase>
<goals>
<goal>translate</goal>
</goals>
<configuration>
<includes>
<include>
<directory>target/classes</directory>
</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>net.sf.retrotranslator</groupId>
<artifactId>retrotranslator-runtime</artifactId>
<version>1.0.8</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<version>2.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package hudson.remoting;
import java.io.Serializable;
/**
* @author Kohsuke Kawaguchi
*/
public interface Callable<V,T extends Throwable> extends Serializable {
V call() throws T;
}
package hudson.remoting;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Represents a communication channel to the remote peer.
*
* @author Kohsuke Kawaguchi
*/
public class Channel {
private final ObjectInputStream ois;
private final ObjectOutputStream oos;
/*package*/ final Executor executor;
private final ReaderThread reader;
/**
* If true, this data channel is already closed and
* no further calls are accepted.
*/
private boolean closed = false;
/*package*/ final Map<Integer,Request<?,?>> pendingCalls = new Hashtable<Integer,Request<?,?>>();
/**
* When sending a class definition to the other end, a classloader needed for it will be registered
* in this table.
*/
/*package*/ final ExportedClassLoaderTable exportedClassLoaders = new ExportedClassLoaderTable();
/**
* {@link ClassLoader}s that are proxies of the remote classloaders.
*/
/*package*/ final ImportedClassLoaderTable importedClassLoaders = new ImportedClassLoaderTable(this);
/**
*
* @param name
* Human readable name of this channel. Used for debug/logging. Can be anything.
* @param exec
* Commands sent from the remote peer will be executed by using this {@link Executor}.
* @param is
* Stream connected to the remote peer.
* @param os
* Stream connected to the remote peer.
*/
public Channel(String name, Executor exec, InputStream is, OutputStream os) throws IOException {
this.executor = exec;
this.oos = new ObjectOutputStream(os);
this.ois = new ObjectInputStream(is);
this.reader = new ReaderThread(name);
reader.start();
}
/**
* Sends a command to the remote end and executes it there.
*/
/*package*/ synchronized void send(Command cmd) throws IOException {
if(closed)
throw new IOException("already closed");
logger.fine("Send "+cmd);
oos.writeObject(cmd);
oos.reset();
}
/**
* Makes a remote procedure call.
*/
public <V extends Serializable,T extends Throwable>
V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
UserResponse<V> r = new UserRequest<V,T>(this, callable).call(this);
try {
return r.retrieve(this);
} catch (ClassNotFoundException e) {
// this is unlikely to happen, so this is a lame implementation
IOException x = new IOException();
x.initCause(e);
throw x;
}
}
private synchronized void terminate(IOException e) {
// abort
closed = true;
synchronized(pendingCalls) {
for (Request<?,?> req : pendingCalls.values())
req.abort(e);
pendingCalls.clear();
}
notify();
}
/**
* Waits for the close down of this {@link Channel}.
*/
public synchronized void join() throws InterruptedException {
while(!closed)
wait();
}
/**
* Notifies the remote peer that we are closing down.
*/
private static final class CloseCommand extends Command {
protected void execute(Channel channel) {
try {
channel.close();
} catch (IOException e) {
logger.log(Level.SEVERE,"close command failed",e);
}
}
public String toString() {
return "close";
}
}
/**
* Performs an orderly shut down of this channel (and the remote peer.)
*/
public void close() throws IOException {
if(closed) return;
send(new CloseCommand());
// TODO: would be nice if we can wait for the completion of pending requests
terminate(null);
}
private final class ReaderThread extends Thread {
public ReaderThread(String name) {
super("DataChannel reader thread: "+name);
}
public void run() {
try {
while(!closed) {
try {
Command cmd = (Command)ois.readObject();
logger.fine("Received "+cmd);
cmd.execute(Channel.this);
} catch (ClassNotFoundException e) {
logger.log(Level.SEVERE, "Unabled to read a command",e);
}
}
ois.close();
oos.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "I/O error in DataChannel",e);
terminate(e);
}
}
}
private static final Logger logger = Logger.getLogger(Channel.class.getName());
}
package hudson.remoting;
/**
* @author Kohsuke Kawaguchi
*/
package hudson.remoting;
import java.io.Serializable;
/**
* One-way command to be sent over to the remote system and executed.
*
* This is layer 0, the lower most layer.
*
* @author Kohsuke Kawaguchi
*/
abstract class Command implements Serializable {
/**
* Called on a remote system to perform this command.
*
* @param channel
* The {@link Channel} of the remote system.
*/
protected abstract void execute(Channel channel);
private static final long serialVersionUID = 1L;
}
package hudson.remoting;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
/**
* Manages unique ID for classloaders.
*
* @author Kohsuke Kawaguchi
*/
final class ExportedClassLoaderTable {
private final Map<Integer, WeakReference<ClassLoader>> table = new HashMap<Integer, WeakReference<ClassLoader>>();
private final WeakHashMap<ClassLoader,Integer> reverse = new WeakHashMap<ClassLoader,Integer>();
// id==0 is reserved for bootstrap classloader
private int iota = 1;
public synchronized int intern(ClassLoader cl) {
if(cl==null) return 0; // bootstrap classloader
Integer id = reverse.get(cl);
if(id==null) {
id = iota++;
table.put(id,new WeakReference<ClassLoader>(cl));
reverse.put(cl,id);
}
return id;
}
public synchronized ClassLoader get(int id) {
WeakReference<ClassLoader> ref = table.get(id);
if(ref==null) return null;
return ref.get();
}
}
package hudson.remoting;
import java.util.Hashtable;
import java.util.Map;
/**
* @author Kohsuke Kawaguchi
*/
final class ImportedClassLoaderTable {
final Channel channel;
final Map<Integer,ClassLoader> classLoaders = new Hashtable<Integer,ClassLoader>();
ImportedClassLoaderTable(Channel channel) {
this.channel = channel;
}
public synchronized ClassLoader get(int id) {
ClassLoader r = classLoaders.get(id);
if(r==null) {
// we need to be able to use the same hudson.remoting classes, hence delegate
// to this class loader.
r = new RemoteClassLoader(getClass().getClassLoader(),channel,id);
classLoaders.put(id,r);
}
return r;
}
}
package hudson.remoting;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
/**
* {@link ObjectInputStream} that uses a specific class loader.
*/
final class ObjectInputStreamEx extends ObjectInputStream {
private final ClassLoader cl;
public ObjectInputStreamEx(InputStream in, ClassLoader cl) throws IOException {
super(in);
this.cl = cl;
}
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
String name = desc.getName();
try {
return Class.forName(name, false, cl);
} catch (ClassNotFoundException ex) {
return super.resolveClass(desc);
}
}
}
package hudson.remoting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Loads class files from the other peer through {@link Channel}.
*
* @author Kohsuke Kawaguchi
*/
final class RemoteClassLoader extends ClassLoader {
private final Channel channel;
private final int id;
public RemoteClassLoader(ClassLoader parent, Channel channel, int id) {
super(parent);
this.id = id;
this.channel = channel;
}
protected Class<?> findClass(String name) throws ClassNotFoundException {
try {
byte[] bytes = new ClassImageFetchRequest(id,name).call(channel);
return defineClass(name, bytes, 0, bytes.length);
} catch (InterruptedException e) {
ClassNotFoundException x = new ClassNotFoundException(e.getMessage());
x.initCause(e);
throw x;
} catch (IOException e) {
ClassNotFoundException x = new ClassNotFoundException(e.getMessage());
x.initCause(e);
throw x;
}
}
private static final class ClassImageFetchRequest extends Request<byte[],ClassNotFoundException> {
private final int id;
private final String className;
public ClassImageFetchRequest(int id, String className) {
this.id = id;
this.className = className;
}
protected byte[] perform(Channel channel) throws ClassNotFoundException {
ClassLoader cl = channel.exportedClassLoaders.get(id);
if(cl==null)
throw new ClassNotFoundException();
InputStream in = cl.getResourceAsStream(className.replace('.', '/') + ".class");
if(in==null)
throw new ClassNotFoundException();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buf = new byte[8192];
int len;
while((len=in.read(buf))>0)
baos.write(buf,0,len);
} catch (IOException e) {
throw new ClassNotFoundException();
}
return baos.toByteArray();
}
public String toString() {
return "fetchClassImage : "+className;
}
}
}
package hudson.remoting;
import java.io.IOException;
import java.io.Serializable;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Request/response pattern over {@link Command}.
*
* This is layer 1. This assumes that the receiving side has all the class definitions
* available to de-serialize {@link Request}.
*
* @author Kohsuke Kawaguchi
* @see Response
*/
abstract class Request<RSP extends Serializable,EXC extends Throwable> extends Command {
/**
* Executed on a remote system to perform the task.
*
* @param channel
* remote data channel.
* @return
* the return value will be sent back to the calling process.
* @throws EXC
* The exception will be forwarded to the calling process.
* If no checked exception is supposed to be thrown, use {@link RuntimeException}.
*/
protected abstract RSP perform(Channel channel) throws EXC;
/**
* Uniquely identifies this request.
*/
private final int id;
private Response<RSP,EXC> response;
protected Request() {
synchronized(Request.class) {
id = nextId++;
}
}
/**
* Sends this request to a remote system, and blocks until we receives a response.
*/
public synchronized final RSP call(Channel channel) throws EXC, InterruptedException, IOException {
channel.pendingCalls.put(id,this);
channel.send(this);
response=null;
while(response==null)
wait(); // wait until the response arrives
if(response.exception!=null)
throw response.exception;
return response.returnValue;
}
/**
* Called by the {@link Response} when we received it.
*/
/*package*/ synchronized void onCompleted(Response<RSP,EXC> response) {
this.response = response;
notify();
}
/**
* Aborts the processing. The calling thread will receive an exception.
*/
/*package*/ void abort(IOException e) {
onCompleted(new Response(id,new RequestAbortedException(e)));
}
/**
* Schedules the execution of this request.
*/
protected final void execute(final Channel channel) {
channel.executor.execute(new Runnable() {
public void run() {
try {
RSP rsp;
try {
rsp = Request.this.perform(channel);
} catch (Throwable t) {
// error return
channel.send(new Response<RSP,Throwable>(id,t));
return;
}
// normal completion
channel.send(new Response<RSP,EXC>(id,rsp));
} catch (IOException e) {
// communication error.
// this means the caller will block forever
logger.log(Level.SEVERE, "Failed to send back a reply",e);
}
}
});
}
/**
* Next request ID.
*/
private static int nextId=0;
private static final long serialVersionUID = 1L;
private static final Logger logger = Logger.getLogger(Request.class.getName());
}
package hudson.remoting;
/**
* Signals that the communication is aborted and thus
* the pending {@link Request} will never recover its {@link Response}.
*
* @author Kohsuke Kawaguchi
*/
public class RequestAbortedException extends RuntimeException {
public RequestAbortedException(Throwable cause) {
super(cause);
}
}
package hudson.remoting;
/**
* Request/response pattern over {@link Command}.
*
* This is layer 1.
*
* @author Kohsuke Kawaguchi
* @see Request
*/
final class Response<RSP,EXC extends Throwable> extends Command {
/**
* ID of the {@link Request} for which
*/
private final int id;
final RSP returnValue;
final EXC exception;
Response(int id, RSP returnValue) {
this.id = id;
this.returnValue = returnValue;
this.exception = null;
}
Response(int id, EXC exception) {
this.id = id;
this.returnValue = null;
this.exception = exception;
}
/**
* Notifies the waiting {@link Request}.
*/
@Override
protected void execute(Channel channel) {
Request req = channel.pendingCalls.get(id);
if(req==null)
return; // maybe aborted
req.onCompleted(this);
}
public String toString() {
return "Response[retVal="+toString(returnValue)+",exception="+toString(exception);
}
private static String toString(Object o) {
if(o==null) return "null";
else return o.toString();
}
private static final long serialVersionUID = 1L;
}
package hudson.remoting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* {@link Request} that can take {@link Callable} whose actual implementation
* may not be known to the remote system in advance.
*
* <p>
* This code assumes that the {@link Callable} object and all reachable code
* are loaded by a single classloader.
*
* @author Kohsuke Kawaguchi
*/
final class UserRequest<RSP extends Serializable,EXC extends Throwable> extends Request<UserResponse<RSP>,EXC> {
private final byte[] request;
private final int classLoaderId;
private final String toString;
public UserRequest(Channel local, Callable<?,EXC> c) throws IOException {
request = serialize(c);
this.toString = c.toString();
classLoaderId = local.exportedClassLoaders.intern(c.getClass().getClassLoader());
}
protected UserResponse<RSP> perform(Channel channel) throws EXC {
try {
ClassLoader cl = channel.importedClassLoaders.get(classLoaderId);
Object o = new ObjectInputStreamEx(new ByteArrayInputStream(request), cl).readObject();
Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o;
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
// execute the service
RSP r = null;
try {
r = callable.call();
} finally {
Thread.currentThread().setContextClassLoader(old);
}
return new UserResponse<RSP>(serialize(r),classLoaderId);
} catch (IOException e) {
// propagate this to the calling process
throw (EXC)e;
} catch (ClassNotFoundException e) {
// propagate this to the calling process
throw (EXC)e;
}
}
private byte[] serialize(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(o);
return baos.toByteArray();
}
public String toString() {
return "UserRequest:"+toString;
}
}
final class UserResponse<RSP extends Serializable> implements Serializable {
private final byte[] response;
private final int classLoaderId;
public UserResponse(byte[] response, int classLoaderId) {
this.response = response;
this.classLoaderId = classLoaderId;
}
public RSP retrieve(Channel channel) throws IOException, ClassNotFoundException {
return (RSP) new ObjectInputStreamEx(
new ByteArrayInputStream(response),
channel.exportedClassLoaders.get(classLoaderId)
).readObject();
}
private static final long serialVersionUID = 1L;
}
package hudson.remoting;
/**
* @author Kohsuke Kawaguchi
*/
public class ClassRemotingTest extends RmiTestBase {
public void test1() throws Throwable {
DummyClassLoader cl = new DummyClassLoader(this.getClass().getClassLoader());
Callable c = (Callable) cl.loadClass("hudson.remoting.test.TestCallable").newInstance();
Object r = channel.call(c);
System.out.println(r);
assertTrue(r.toString().startsWith("hudson.remoting.RemoteClassLoader@"));
}
}
package hudson.remoting;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.File;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.net.URL;
/**
* Used to load a dummay class <tt>hudson.remoting.test.TestCallable</tt>
* out of nowhere, to test {@link RemoteClassLoader}.
*
* @author Kohsuke Kawaguchi
*/
class DummyClassLoader extends ClassLoader {
public DummyClassLoader(ClassLoader parent) {
super(parent);
}
protected Class<?> findClass(String name) throws ClassNotFoundException {
if(name.equals("hudson.remoting.test.TestCallable")) {
// rename a class
try {
byte[] bytes = loadTransformedClassImage(name);
return defineClass(name,bytes,0,bytes.length);
} catch (IOException e) {
throw new ClassNotFoundException("Bytecode manipulation failed",e);
}
}
return super.findClass(name);
}
private byte[] loadTransformedClassImage(final String name) throws IOException {
InputStream in = getResourceAsStream("hudson/remoting/TestCallable.class");
// rename a class
ClassReader cr = new ClassReader(in);
ClassWriter w = new ClassWriter(cr,true) {
public void visit(int version, int access, String _name, String sig, String superName, String[] interfaces) {
super.visit(version, access, name.replace('.','/'), sig, superName, interfaces);
}
};
cr.accept(w,false);
return w.toByteArray();
}
protected URL findResource(String name) {
if(name.equals("hudson/remoting/test/TestCallable.class")) {
try {
File f = File.createTempFile("rmiTest","class");
OutputStream os = new FileOutputStream(f);
os.write(loadTransformedClassImage("hudson.remoting.test.TestCallable"));
os.close();
return f.toURL();
} catch (IOException e) {
return null;
}
}
return super.findResource(name);
}
}
package hudson.remoting;
import junit.framework.TestCase;
/**
* @author Kohsuke Kawaguchi
*/
public class DummyClassLoaderTest extends TestCase {
public void testLoad() throws Throwable {
DummyClassLoader cl = new DummyClassLoader(this.getClass().getClassLoader());
Callable c = (Callable) cl.loadClass("hudson.remoting.test.TestCallable").newInstance();
System.out.println(c.call());
// make sure that the returned class is loaded from the dummy classloader
assertTrue(c.call().toString().startsWith(DummyClassLoader.class.getName()));
}
}
package hudson.remoting;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Kohsuke Kawaguchi
*/
public abstract class RmiTestBase extends TestCase {
protected Channel channel;
private ExecutorService executor;
private Exception failure;
protected void setUp() throws Exception {
final PipedInputStream in1 = new PipedInputStream();
final PipedOutputStream out1 = new PipedOutputStream(in1);
final PipedInputStream in2 = new PipedInputStream();
final PipedOutputStream out2 = new PipedOutputStream(in2);
executor = Executors.newCachedThreadPool();
Thread t = new Thread("south bridge runner") {
public void run() {
try {
Channel s = new Channel("south", executor, in2, out1);
s.join();
System.out.println("south completed");
} catch (IOException e) {
e.printStackTrace();
failure = e;
} catch (InterruptedException e) {
e.printStackTrace();
failure = e;
}
}
};
t.start();
channel = new Channel("north", executor, in1, out2);
}
protected void tearDown() throws Exception {
channel.close();
System.out.println("north completed");
executor.shutdown();
if(failure!=null)
throw failure; // report a failure in the south side
}
}
package hudson.remoting;
/**
* @author Kohsuke Kawaguchi
*/
public class SimpleTest extends RmiTestBase {
public void test1() throws Exception {
int r = channel.call(new Callable1());
System.out.println("result=" + r);
}
private static class Callable1 implements Callable<Integer, RuntimeException> {
public Integer call() throws RuntimeException {
System.out.println("invoked");
return 5;
}
}
public void test2() throws Exception {
try {
channel.call(new Callable2());
fail();
} catch (RuntimeException e) {
assertEquals(e.getMessage(),"foo");
}
}
private static class Callable2 implements Callable<Integer, RuntimeException> {
public Integer call() throws RuntimeException {
throw new RuntimeException("foo");
}
}
}
package hudson.remoting;
/**
* {@link Callable} used to verify the classloader used.
*
* @author Kohsuke Kawaguchi
*/
public class TestCallable implements Callable {
public Object call() {
return getClass().getClassLoader().toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册