提交 e777ee6e 编写于 作者: A Aljoscha Krettek

Merge pull request #155 from aljoscha/user-code-loader-bugfix

Fix bug with user-code wrappers
......@@ -18,8 +18,10 @@ package eu.stratosphere.pact.common.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
......@@ -31,6 +33,30 @@ import eu.stratosphere.nephele.configuration.Configuration;
*/
public class InstantiationUtil {
/**
* A custom ObjectInputStream that can also load user-code using a
* user-code ClassLoader.
*
*/
private static class ClassLoaderObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;
@Override
public Class<?> resolveClass(ObjectStreamClass desc)
throws IOException, ClassNotFoundException {
if (classLoader != null) {
return Class.forName(desc.getName(), false, classLoader);
}
return super.resolveClass(desc);
}
public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
super(in);
this.classLoader = classLoader;
}
}
/**
* Creates a new instance of the given class.
*
......@@ -171,7 +197,7 @@ public class InstantiationUtil {
}
}
public static Object readObjectFormConfig(Configuration config, String key) throws IOException, ClassNotFoundException {
public static Object readObjectFormConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
byte[] bytes = config.getBytes(key, null);
if (bytes == null) {
return null;
......@@ -179,7 +205,7 @@ public class InstantiationUtil {
ObjectInputStream oois = null;
try {
oois = new ObjectInputStream(new ByteArrayInputStream(bytes));
oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl);
return oois.readObject();
} finally {
if (oois != null) {
......
......@@ -210,7 +210,6 @@ public class DataSinkTask<IT> extends AbstractOutputTask
* Throws if instance of OutputFormat implementation can not be
* obtained.
*/
@SuppressWarnings("unchecked")
private void initOutputFormat() {
if (this.userCodeClassLoader == null) {
try {
......@@ -225,7 +224,8 @@ public class DataSinkTask<IT> extends AbstractOutputTask
this.config = new TaskConfig(taskConf);
try {
this.format = (OutputFormat<IT>)this.config.getStubWrapper().getUserCodeObject();
this.format = config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
// check if the class is a subclass, if the check is required
if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
......
......@@ -47,7 +47,6 @@ import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.util.InstantiationUtil;
import eu.stratosphere.pact.common.util.MutableObjectIterator;
import eu.stratosphere.pact.common.util.PactConfigConstants;
import eu.stratosphere.pact.generic.contract.UserCodeWrapper;
import eu.stratosphere.pact.generic.stub.GenericReducer;
import eu.stratosphere.pact.generic.types.TypeComparator;
import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
......@@ -479,10 +478,9 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
// public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) {
// try {
// T stub = (T) ((UserCodeWrapper<T>) config.getStubWrapper()).getUserCodeObject(superClass, cl);
@SuppressWarnings("unchecked")
protected <T> T initStub(Class<? super T> stubSuperClass) throws Exception {
protected S initStub(Class<? super S> stubSuperClass) throws Exception {
try {
T stub = (T) ((UserCodeWrapper<T>) config.getStubWrapper()).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
// check if the class is a subclass, if the check is required
if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
......@@ -1305,10 +1303,9 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
*
* @return An instance of the user code class.
*/
@SuppressWarnings("unchecked")
public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) {
try {
T stub = (T) ((UserCodeWrapper<T>) config.getStubWrapper()).getUserCodeObject(superClass, cl);
T stub = config.<T>getStubWrapper(cl).getUserCodeObject(superClass, cl);
// check if the class is a subclass, if the check is required
if (superClass != null && !superClass.isAssignableFrom(stub.getClass())) {
throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
......
......@@ -253,10 +253,10 @@ public class TaskConfig {
}
@SuppressWarnings("unchecked")
public <T> UserCodeWrapper<T> getStubWrapper()
public <T> UserCodeWrapper<T> getStubWrapper(ClassLoader cl)
{
try {
return (UserCodeWrapper<T>) InstantiationUtil.readObjectFormConfig(this.config, STUB_OBJECT);
return (UserCodeWrapper<T>) InstantiationUtil.readObjectFormConfig(this.config, STUB_OBJECT, cl);
} catch (ClassNotFoundException e) {
throw new CorruptConfigurationException("Could not read the user code wrapper: " + e);
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册