提交 35438ec2 编写于 作者: S Stephan Ewen

InputVertices do not require an input format.

上级 ef623e9b
......@@ -429,8 +429,9 @@ public final class ExecutionGroupVertex {
*
* @param inputSplitType Input split type class
*/
public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) { this.inputSplitType =
inputSplitType; }
public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) {
this.inputSplitType = inputSplitType;
}
/**
* Returns the input splits assigned to this group vertex.
......@@ -438,7 +439,6 @@ public final class ExecutionGroupVertex {
* @return the input splits, possibly <code>null</code> if the group vertex does not represent an input vertex
*/
public InputSplit[] getInputSplits() {
return this.inputSplits;
}
......
......@@ -72,8 +72,8 @@ public class JobInputVertex extends AbstractJobInputVertex {
*/
@Override
public Class<? extends InputSplit> getInputSplitType() {
if(inputFormat == null){
throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
if (inputFormat == null){
return InputSplit.class;
}
return inputFormat.getInputSplitType();
......@@ -89,7 +89,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
@Override
public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
if (inputFormat == null){
throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
return null;
}
return inputFormat.createInputSplits(minNumSplits);
......
......@@ -13,12 +13,16 @@
package eu.stratosphere.test.runtime;
import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -26,20 +30,11 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.LogUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class NetworkStackThroughput {
......@@ -109,9 +104,6 @@ public class NetworkStackThroughput {
producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
TaskConfig inputConfig = new TaskConfig(producer.getConfiguration());
inputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyInputFormat()));
JobTaskVertex forwarder = null;
if (useForwarder) {
forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
......@@ -124,9 +116,6 @@ public class NetworkStackThroughput {
consumer.setNumberOfSubtasks(numSubtasks);
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
TaskConfig outputConfig = new TaskConfig(consumer.getConfiguration());
outputConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(new DummyOutputFormat()));
if (useForwarder) {
producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
......@@ -269,45 +258,6 @@ public class NetworkStackThroughput {
}
}
public static final class DummyInputFormat extends GenericInputFormat {
private static final long serialVersionUID = 6891640958330871924L;
@Override
public void open(InputSplit split) throws IOException {
}
@Override
public boolean reachedEnd() throws IOException {
return false;
}
@Override
public Object nextRecord(Object reuse) throws IOException {
return null;
}
}
public static final class DummyOutputFormat implements OutputFormat<Record> {
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) {
}
@Override
public void writeRecord(Record record) {
}
@Override
public void close() {
}
}
// ------------------------------------------------------------------------
public void testThroughput() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册