提交 d8589303 编写于 作者: S Stephan Ewen

[FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and...

[FLINK-1286] [APIs] [runtime] Fix serialization in CollectionInputFormat and generate meaningful error messages
上级 82f5154a
......@@ -34,6 +34,7 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.dag.TempMode;
......@@ -835,6 +836,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setInvokableClass(DataSourceTask.class);
vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
......@@ -850,7 +852,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
vertex.setInvokableClass(DataSinkTask.class);
vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper()));
// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
config.setStubParameters(node.getPactContract().getParameters());
......@@ -1426,6 +1429,25 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
}
private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
try {
if (wrapper.hasObject()) {
try {
return wrapper.getUserCodeObject().toString();
}
catch (Throwable t) {
return wrapper.getUserCodeClass().getName();
}
}
else {
return wrapper.getUserCodeClass().getName();
}
}
catch (Throwable t) {
return null;
}
}
// -------------------------------------------------------------------------------------
// Descriptors for tasks / configurations that are chained or merged with other tasks
......
......@@ -53,4 +53,9 @@ public class UserCodeClassWrapper<T> implements UserCodeWrapper<T> {
public Class<? extends T> getUserCodeClass() {
return userCodeClass;
}
@Override
public boolean hasObject() {
return false;
}
}
......@@ -121,4 +121,9 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T> {
public Class<? extends T> getUserCodeClass() {
return (Class<? extends T>) userCodeObject.getClass();
}
@Override
public boolean hasObject() {
return true;
}
}
......@@ -22,29 +22,26 @@ import java.io.Serializable;
import java.lang.annotation.Annotation;
/**
* PACT contracts can have either a class or an object containing the user
* UDf operators can have either a class or an object containing the user
* code, this is the common interface to access them.
*/
public interface UserCodeWrapper<T> extends Serializable {
/**
* Gets the user code object. In the case of a pact, that object will be the stub with the user function,
* in the case of an input or output format, it will be the format object.
*
* Gets the user code object, which may be either a function or an input or output format.
* The subclass is supposed to just return the user code object or instantiate the class.
*
* @return The class with the user code.
*/
public T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
T getUserCodeObject(Class<? super T> superClass, ClassLoader cl);
/**
* Gets the user code object. In the case of a pact, that object will be the stub with the user function,
* in the case of an input or output format, it will be the format object.
*
* The subclass is supposed to just return the user code object or instantiate the class.
*
* @return The class with the user code.
*/
public T getUserCodeObject();
T getUserCodeObject();
/**
* Gets an annotation that pertains to the user code class. By default, this method will look for
......@@ -55,7 +52,7 @@ public interface UserCodeWrapper<T> extends Serializable {
* the Class object corresponding to the annotation type
* @return the annotation, or null if no annotation of the requested type was found
*/
public <A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
<A extends Annotation> A getUserCodeAnnotation(Class<A> annotationClass);
/**
* Gets the class of the user code. If the user code is provided as a class, this class is just returned.
......@@ -63,5 +60,12 @@ public interface UserCodeWrapper<T> extends Serializable {
*
* @return The class of the user code object.
*/
public Class<? extends T> getUserCodeClass ();
Class<? extends T> getUserCodeClass ();
/**
* Checks whether the wrapper already has an object, or whether it needs to instantiate it.
*
* @return True, if the wrapper has already an object, false if it has only a class.
*/
boolean hasObject();
}
......@@ -78,11 +78,15 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeInt(dataSet.size());
OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
for (T element : dataSet){
serializer.serialize(element, wrapper);
final int size = dataSet.size();
out.writeInt(size);
if (size > 0) {
OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
for (T element : dataSet){
serializer.serialize(element, wrapper);
}
}
}
......@@ -92,11 +96,17 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
for (int i = 0; i < collectionLength; i++){
T element = serializer.createInstance();
element = serializer.deserialize(element, wrapper);
list.add(element);
if (collectionLength > 0) {
try {
InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
for (int i = 0; i < collectionLength; i++){
T element = serializer.deserialize(wrapper);
list.add(element);
}
}
catch (Throwable t) {
throw new IOException("Error while deserializing element from collection", t);
}
}
dataSet = list;
......
......@@ -246,11 +246,8 @@ public class CollectionInputFormatTest {
in.readObject();
fail("should throw an exception");
}
catch (TestException e) {
// expected
}
catch (Exception e) {
fail("Exception not properly forwarded");
assertTrue(e.getCause() instanceof TestException);
}
}
catch (Exception e) {
......
......@@ -26,8 +26,7 @@ public class InputFormatVertex extends AbstractJobVertex {
private static final long serialVersionUID = 1L;
/** Caches the output format associated to this output vertex. */
private transient InputFormat<?, ?> inputFormat;
private String formatDescription;
public InputFormatVertex(String name) {
......@@ -39,19 +38,47 @@ public class InputFormatVertex extends AbstractJobVertex {
}
public void setFormatDescription(String formatDescription) {
this.formatDescription = formatDescription;
}
public String getFormatDescription() {
return formatDescription;
}
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
if (inputFormat == null) {
TaskConfig cfg = new TaskConfig(getConfiguration());
UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
if (wrapper == null) {
throw new Exception("No input format present in InputFormatVertex's task configuration.");
}
final TaskConfig cfg = new TaskConfig(getConfiguration());
// deserialize from the payload
UserCodeWrapper<InputFormat<?, ?>> wrapper;
try {
wrapper = cfg.getStubWrapper(loader);
}
catch (Throwable t) {
throw new Exception("Deserializing the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (wrapper == null) {
throw new Exception("No input format present in InputFormatVertex's task configuration.");
}
// instantiate, if necessary
InputFormat<?, ?> inputFormat;
try {
inputFormat = wrapper.getUserCodeObject(InputFormat.class, loader);
}
catch (Throwable t) {
throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
// configure
try {
inputFormat.configure(cfg.getStubParameters());
}
catch (Throwable t) {
throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
setInputSplitSource(inputFormat);
}
......
......@@ -31,10 +31,8 @@ public class OutputFormatVertex extends AbstractJobVertex {
private static final long serialVersionUID = 1L;
private String formatDescription;
/** Caches the output format associated to this output vertex. */
private transient OutputFormat<?> outputFormat;
/**
* Creates a new task vertex with the specified name.
*
......@@ -44,23 +42,45 @@ public class OutputFormatVertex extends AbstractJobVertex {
super(name);
}
public void setFormatDescription(String formatDescription) {
this.formatDescription = formatDescription;
}
public String getFormatDescription() {
return formatDescription;
}
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
if (this.outputFormat == null) {
TaskConfig cfg = new TaskConfig(getConfiguration());
UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
final TaskConfig cfg = new TaskConfig(getConfiguration());
if (wrapper == null) {
throw new Exception("No output format present in OutputFormatVertex's task configuration.");
}
this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
this.outputFormat.configure(cfg.getStubParameters());
UserCodeWrapper<OutputFormat<?>> wrapper;
try {
wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
}
catch (Throwable t) {
throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (wrapper == null) {
throw new Exception("No input format present in InputFormatVertex's task configuration.");
}
OutputFormat<?> outputFormat;
try {
outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
}
catch (Throwable t) {
throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
try {
outputFormat.configure(cfg.getStubParameters());
}
catch (Throwable t) {
throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
if (this.outputFormat instanceof InitializeOnMaster) {
((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
if (outputFormat instanceof InitializeOnMaster) {
((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册