提交 91879ec8 编写于 作者: K kohsuke

making more progress.

- implemented the remote method call.
- class loader export mechanism is now consolidated with the general object export mechanism.


git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@1296 71c3de6d-444a-0410-be80-ed276b4c234a
上级 4036c46d
......@@ -6,6 +6,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Executor;
......@@ -33,15 +34,14 @@ public class Channel {
/*package*/ final Map<Integer,Request<?,?>> pendingCalls = new Hashtable<Integer,Request<?,?>>();
/**
* When sending a class definition to the other end, a classloader needed for it will be registered
* in this table.
* {@link ClassLoader}s that are proxies of the remote classloaders.
*/
/*package*/ final ExportedClassLoaderTable exportedClassLoaders = new ExportedClassLoaderTable();
/*package*/ final ImportedClassLoaderTable importedClassLoaders = new ImportedClassLoaderTable(this);
/**
* {@link ClassLoader}s that are proxies of the remote classloaders.
* Objects exported via {@link #export(Class, Object)}.
*/
/*package*/ final ImportedClassLoaderTable importedClassLoaders = new ImportedClassLoaderTable(this);
/*package*/ final ExportTable<Object> exportedObjects = new ExportTable<Object>();
/**
*
......@@ -69,10 +69,33 @@ public class Channel {
if(closed)
throw new IOException("already closed");
logger.fine("Send "+cmd);
oos.writeObject(cmd);
Channel old = Channel.setCurrent(this);
try {
oos.writeObject(cmd);
} finally {
Channel.setCurrent(old);
}
oos.reset();
}
/**
* Exports an object for remoting to the other {@link Channel}.
*
* @param type
* Interface to be remoted.
* @return
* the proxy object that implements <tt>T</tt>. This object can be transfered
* to the other {@link Channel}, and calling methods on it will invoke the
* same method on the given <tt>instance</tt> object.
*/
/*package*/ synchronized <T> T export(Class<T> type, T instance) {
// TODO: unexport
final int id = exportedObjects.intern(instance);
return type.cast(Proxy.newProxyInstance( type.getClassLoader(), new Class[]{type},
new RemoteInvocationHandler(id)));
}
/**
* Makes a remote procedure call.
*/
......@@ -80,7 +103,7 @@ public class Channel {
V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
UserResponse<V> r = new UserRequest<V,T>(this, callable).call(this);
try {
return r.retrieve(this);
return r.retrieve(this, callable.getClass().getClassLoader());
} catch (ClassNotFoundException e) {
// this is unlikely to happen, so this is a lame implementation
IOException x = new IOException();
......@@ -146,7 +169,13 @@ public class Channel {
try {
while(!closed) {
try {
Command cmd = (Command)ois.readObject();
Command cmd = null;
Channel old = Channel.setCurrent(Channel.this);
try {
cmd = (Command)ois.readObject();
} finally {
Channel.setCurrent(old);
}
logger.fine("Received "+cmd);
cmd.execute(Channel.this);
} catch (ClassNotFoundException e) {
......@@ -162,5 +191,27 @@ public class Channel {
}
}
/*package*/ static Channel setCurrent(Channel channel) {
Channel old = CURRENT.get();
CURRENT.set(channel);
return old;
}
/**
* This method can be invoked during the serialization/deserialization of
* objects, when they are transferred to the remote {@link Channel}.
*
* @return null
* if the calling thread is not performing serialization.
*/
public static Channel current() {
return CURRENT.get();
}
/**
* Remembers the current "channel" associated for this thread.
*/
private static final ThreadLocal<Channel> CURRENT = new ThreadLocal<Channel>();
private static final Logger logger = Logger.getLogger(Channel.class.getName());
}
package hudson.remoting;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
/**
* Manages unique ID for classloaders.
*
* @author Kohsuke Kawaguchi
*/
final class ExportTable<T> {
private final Map<Integer, WeakReference<T>> table = new HashMap<Integer, WeakReference<T>>();
private final WeakHashMap<T,Integer> reverse = new WeakHashMap<T,Integer>();
// id==0 is reserved for bootstrap classloader
private int iota = 1;
public synchronized int intern(T cl) {
if(cl==null) return 0; // bootstrap classloader
Integer id = reverse.get(cl);
if(id==null) {
id = iota++;
table.put(id,new WeakReference<T>(cl));
reverse.put(cl,id);
}
return id;
}
public synchronized T get(int id) {
WeakReference<T> ref = table.get(id);
if(ref==null) return null;
return ref.get();
}
}
package hudson.remoting;
import hudson.remoting.RemoteClassLoader.IClassLoader;
import java.util.Hashtable;
import java.util.Map;
......@@ -8,19 +10,19 @@ import java.util.Map;
*/
final class ImportedClassLoaderTable {
final Channel channel;
final Map<Integer,ClassLoader> classLoaders = new Hashtable<Integer,ClassLoader>();
final Map<IClassLoader,ClassLoader> classLoaders = new Hashtable<IClassLoader,ClassLoader>();
ImportedClassLoaderTable(Channel channel) {
this.channel = channel;
}
public synchronized ClassLoader get(int id) {
ClassLoader r = classLoaders.get(id);
public synchronized ClassLoader get(IClassLoader classLoaderProxy) {
ClassLoader r = classLoaders.get(classLoaderProxy);
if(r==null) {
// we need to be able to use the same hudson.remoting classes, hence delegate
// to this class loader.
r = new RemoteClassLoader(getClass().getClassLoader(),channel,id);
classLoaders.put(id,r);
r = new RemoteClassLoader(getClass().getClassLoader(),classLoaderProxy);
classLoaders.put(classLoaderProxy,r);
}
return r;
}
......
package hudson.remoting;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.BufferedOutputStream;
import java.io.PipedOutputStream;
/**
* @author Kohsuke Kawaguchi
*/
public final class Pipe implements Serializable {
private InputStream in;
private OutputStream out;
private Pipe(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
public InputStream getIn() {
return in;
}
public OutputStream getOut() {
return out;
}
public static Pipe create() {
// OutputStream will be created on the target
return new Pipe(new PipedInputStream(),null);
}
private void writeObject(ObjectOutputStream oos) throws IOException {
if(in!=null && out==null) {
// remote will write to local
IStream proxy = Channel.current().export(IStream.class, new IStream() {
PipedOutputStream pos = new PipedOutputStream((PipedInputStream)in);
public void write(byte[] buf) throws IOException {
pos.write(buf);
}
public void close() throws IOException {
pos.close();
}
});
oos.writeBoolean(true); // marker
oos.writeObject(proxy);
} else {
// TODO: remote will read from local
throw new UnsupportedOperationException();
}
}
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
final Channel channel = Channel.current();
assert channel !=null;
if(ois.readBoolean()) {
// local will write to remote
final IStream proxy = (IStream)ois.readObject();
in = null;
out = new BufferedOutputStream(new OutputStream() {
public void write(int b) throws IOException {
write(new byte[]{(byte)b},0,1);
}
public void write(byte b[], int off, int len) throws IOException {
if(off==0 && len==b.length)
write(b);
else {
byte[] buf = new byte[len];
System.arraycopy(b,off,buf,0,len);
write(buf);
}
}
public void write(byte b[]) throws IOException {
proxy.write(b);
}
public void close() throws IOException {
proxy.close();
}
});
} else {
// TODO
throw new UnsupportedOperationException();
}
}
private static interface IStream {
void write(byte[] buf) throws IOException;
void close() throws IOException;
}
private static final long serialVersionUID = 1L;
}
......@@ -10,44 +10,37 @@ import java.io.InputStream;
* @author Kohsuke Kawaguchi
*/
final class RemoteClassLoader extends ClassLoader {
private final Channel channel;
private final int id;
private final IClassLoader proxy;
public RemoteClassLoader(ClassLoader parent, Channel channel, int id) {
public RemoteClassLoader(ClassLoader parent, IClassLoader proxy) {
super(parent);
this.id = id;
this.channel = channel;
this.proxy = proxy;
}
protected Class<?> findClass(String name) throws ClassNotFoundException {
try {
byte[] bytes = new ClassImageFetchRequest(id,name).call(channel);
return defineClass(name, bytes, 0, bytes.length);
} catch (InterruptedException e) {
ClassNotFoundException x = new ClassNotFoundException(e.getMessage());
x.initCause(e);
throw x;
} catch (IOException e) {
ClassNotFoundException x = new ClassNotFoundException(e.getMessage());
x.initCause(e);
throw x;
}
byte[] bytes = proxy.fetch(name);
return defineClass(name, bytes, 0, bytes.length);
}
private static final class ClassImageFetchRequest extends Request<byte[],ClassNotFoundException> {
private final int id;
private final String className;
/**
* Remoting interface.
*/
/*package*/ static interface IClassLoader {
byte[] fetch(String className) throws ClassNotFoundException;
}
public ClassImageFetchRequest(int id, String className) {
this.id = id;
this.className = className;
}
public static IClassLoader export(ClassLoader cl, Channel local) {
return local.export(IClassLoader.class, new ClassLoaderProxy(cl));
}
protected byte[] perform(Channel channel) throws ClassNotFoundException {
ClassLoader cl = channel.exportedClassLoaders.get(id);
if(cl==null)
throw new ClassNotFoundException();
/*package*/ static final class ClassLoaderProxy implements IClassLoader {
private final ClassLoader cl;
public ClassLoaderProxy(ClassLoader cl) {
this.cl = cl;
}
public byte[] fetch(String className) throws ClassNotFoundException {
InputStream in = cl.getResourceAsStream(className.replace('.', '/') + ".class");
if(in==null)
throw new ClassNotFoundException();
......@@ -66,9 +59,15 @@ final class RemoteClassLoader extends ClassLoader {
return baos.toByteArray();
}
public boolean equals(Object that) {
if (this == that) return true;
if (that == null || getClass() != that.getClass()) return false;
return cl.equals(((ClassLoaderProxy) that).cl);
}
public String toString() {
return "fetchClassImage : "+className;
public int hashCode() {
return cl.hashCode();
}
}
}
package hudson.remoting;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.IOException;
/**
* Sits behind a proxy object and implements the proxy logic.
*
* @author Kohsuke Kawaguchi
*/
final class RemoteInvocationHandler implements InvocationHandler, Serializable {
/**
* This proxy acts as a proxy to the object of
* Object ID on the remote {@link Channel}.
*/
private final int oid;
/**
* Represents the connection to the remote {@link Channel}.
*/
private transient Channel channel;
RemoteInvocationHandler(int id) {
this.oid = id;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(channel==null)
throw new IllegalStateException("proxy is not connected to a channel");
if(args==null) args = EMPTY_ARRAY;
if(method.getDeclaringClass()==Object.class) {
// handle equals and hashCode by ourselves
try {
return method.invoke(this,args);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
} else {
// delegate the rest of the methods to the remote object
return new RPCRequest(oid,method,args).call(channel);
}
}
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
channel = Channel.current();
ois.defaultReadObject();
}
/**
* Two proxies are the same iff they represent the same remote object.
*/
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RemoteInvocationHandler that = (RemoteInvocationHandler) o;
if (oid != that.oid) return false;
if (channel!=that.channel) return false;
return true;
}
public int hashCode() {
return oid;
}
private static final long serialVersionUID = 1L;
private static final class RPCRequest extends Request<Serializable,Throwable> {
/**
* Target object id to invoke.
*/
private final int oid;
private final String methodName;
/**
* Type name of the arguments to invoke. They are names because
* neither {@link Method} nor {@link Class} is serializable.
*/
private final String[] types;
/**
* Arguments to invoke the method with.
*/
private final Object[] arguments;
public RPCRequest(int oid, Method m, Object[] arguments) {
this.oid = oid;
this.arguments = arguments;
this.methodName = m.getName();
this.types = new String[arguments.length];
Class<?>[] params = m.getParameterTypes();
for( int i=0; i<arguments.length; i++ )
types[i] = params[i].getName();
}
protected Serializable perform(Channel channel) throws Throwable {
Object o = channel.exportedObjects.get(oid);
try {
return (Serializable)choose(o).invoke(o,arguments);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
/**
* Chooses the method to invoke.
*/
private Method choose(Object o) {
OUTER:
for(Method m : o.getClass().getMethods()) {
if(!m.getName().equals(methodName))
continue;
Class<?>[] paramTypes = m.getParameterTypes();
if(types.length!=arguments.length)
continue;
for( int i=0; i<types.length; i++ ) {
if(!types[i].equals(paramTypes[i].getName()))
continue OUTER;
}
return m;
}
return null;
}
}
private static final Object[] EMPTY_ARRAY = new Object[0];
}
package hudson.remoting;
import hudson.remoting.RemoteClassLoader.IClassLoader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
......@@ -19,20 +21,26 @@ import java.io.Serializable;
final class UserRequest<RSP extends Serializable,EXC extends Throwable> extends Request<UserResponse<RSP>,EXC> {
private final byte[] request;
private final int classLoaderId;
private final IClassLoader classLoaderProxy;
private final String toString;
public UserRequest(Channel local, Callable<?,EXC> c) throws IOException {
request = serialize(c);
request = serialize(c,local);
this.toString = c.toString();
classLoaderId = local.exportedClassLoaders.intern(c.getClass().getClassLoader());
classLoaderProxy = RemoteClassLoader.export( c.getClass().getClassLoader(), local );
}
protected UserResponse<RSP> perform(Channel channel) throws EXC {
try {
ClassLoader cl = channel.importedClassLoaders.get(classLoaderId);
ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);
Object o = new ObjectInputStreamEx(new ByteArrayInputStream(request), cl).readObject();
Object o;
Channel oldc = Channel.setCurrent(channel);
try {
o = new ObjectInputStreamEx(new ByteArrayInputStream(request), cl).readObject();
} finally {
Channel.setCurrent(oldc);
}
Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o;
ClassLoader old = Thread.currentThread().getContextClassLoader();
......@@ -45,7 +53,7 @@ final class UserRequest<RSP extends Serializable,EXC extends Throwable> extends
Thread.currentThread().setContextClassLoader(old);
}
return new UserResponse<RSP>(serialize(r),classLoaderId);
return new UserResponse<RSP>(serialize(r,channel));
} catch (IOException e) {
// propagate this to the calling process
throw (EXC)e;
......@@ -55,10 +63,15 @@ final class UserRequest<RSP extends Serializable,EXC extends Throwable> extends
}
}
private byte[] serialize(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(o);
return baos.toByteArray();
private byte[] serialize(Object o, Channel localChannel) throws IOException {
Channel old = Channel.setCurrent(localChannel);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(o);
return baos.toByteArray();
} finally {
Channel.setCurrent(old);
}
}
public String toString() {
......@@ -68,18 +81,13 @@ final class UserRequest<RSP extends Serializable,EXC extends Throwable> extends
final class UserResponse<RSP extends Serializable> implements Serializable {
private final byte[] response;
private final int classLoaderId;
public UserResponse(byte[] response, int classLoaderId) {
public UserResponse(byte[] response) {
this.response = response;
this.classLoaderId = classLoaderId;
}
public RSP retrieve(Channel channel) throws IOException, ClassNotFoundException {
return (RSP) new ObjectInputStreamEx(
new ByteArrayInputStream(response),
channel.exportedClassLoaders.get(classLoaderId)
).readObject();
public RSP retrieve(Channel channel, ClassLoader cl) throws IOException, ClassNotFoundException {
return (RSP) new ObjectInputStreamEx(new ByteArrayInputStream(response),cl).readObject();
}
private static final long serialVersionUID = 1L;
......
/**
* Remoting infrastructure for Hudson.
*
* Code in this package is used for running a part of a program in slaves.
*/
package hudson.remoting;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册