提交 6a45336b 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] new api refactor

上级 e196ee63
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.5.1</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -38,8 +38,8 @@ public class DataStream<T extends Tuple> {
return context.addFlatMapFunction(this, flatMapper);
}
public <R extends Tuple> void addSink() {
context.addSink(this);
public <R extends Tuple> DataStream<R> addDummySink() {
return context.addDummySink(this);
}
// public <R> DataStream<R> map(MapFunction<T, R> mapper) {
......
......@@ -10,8 +10,8 @@ public class FlatMapInvokable<T extends Tuple, R extends Tuple> extends UserTask
private static final long serialVersionUID = 1L;
private FlatMapFunction<T, R> flatMapper;
public FlatMapInvokable(FlatMapFunction<T, R> flatMapper2) {
this.flatMapper = flatMapper2;
public FlatMapInvokable(FlatMapFunction<T, R> flatMapper) {
this.flatMapper = flatMapper;
}
@Override
......
......@@ -9,7 +9,9 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......@@ -57,7 +59,7 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable<T, R>(flatMapper),
baos.toByteArray());
"flatMap", baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId());
......@@ -87,27 +89,33 @@ public class StreamExecutionEnvironment {
// returnStream.getId());
// return returnStream;
// }
public static final class Sink extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
public static final class DummySink extends UserSinkInvokable<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
System.out.println("sink");
public void invoke(StreamRecord record, StreamCollector<Tuple1<String>> collector)
throws Exception {
for (Tuple tuple : record.getBatchIterable()) {
System.out.println(tuple);
}
}
}
public <T extends Tuple, R extends Tuple> DataStream<R> addSink(DataStream<T> inputStream) {
public <T extends Tuple, R extends Tuple> DataStream<R> addDummySink(DataStream<T> inputStream) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(new Sink());
oos.writeObject(new DummySink());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setSink("sink", new FlatMapInvokableSink<Tuple1<String>, Tuple1<String>>(
new Sink()), baos.toByteArray());
jobGraphBuilder.setSink("sink", new DummySink(),"sink" ,baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), "sink");
return new DataStream<R>(this);
......@@ -130,7 +138,7 @@ public class StreamExecutionEnvironment {
e.printStackTrace();
}
jobGraphBuilder.setSource(returnStream.getId(), new DummySource(), baos.toByteArray());
jobGraphBuilder.setSource(returnStream.getId(), new DummySource(), "source",baos.toByteArray());
return returnStream;
}
......
......@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -34,8 +36,6 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......@@ -96,41 +96,7 @@ public class JobGraphBuilder {
this(jobGraphName, FaultToleranceType.NONE);
}
/**
* Adds a source component to the JobGraph with no parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableClass
* User defined class describing the source
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
setSource(sourceName, InvokableClass, 1, 1);
}
/**
* Adds a source component to the JobGraph
*
* @param sourceName
* Name of the source component
* @param InvokableClass
* User defined class describing the source
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableClass, parallelism, subtasksPerInstance, source);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
}
}
/**
* Adds source to the JobGraph by user defined object with no parallelism
......@@ -141,10 +107,12 @@ public class JobGraphBuilder {
* User defined UserSourceInvokable object or other predefined
* source object
*/
public void setSource(String sourceName, UserSourceInvokable InvokableObject,
public void setSource(String sourceName, UserSourceInvokable InvokableObject,String operatorName,
byte[] serializedFunction) {
Configuration config = setSource(sourceName, InvokableObject, 1, 1);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
......@@ -174,48 +142,11 @@ public class JobGraphBuilder {
return config;
}
/**
* Adds a task component to the JobGraph with no parallelism
*
* @param taskName
* Name of the task component
* @param InvokableClass
* User defined class describing the task
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass) {
setTask(taskName, InvokableClass, 1, 1);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* @param InvokableClass
* User defined class describing the task
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
* @return
*/
public Configuration setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
Configuration config = setComponent(taskName, InvokableClass, parallelism,
subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
return config;
}
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject,
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject,String operatorName,
byte[] serializedFunction) {
Configuration config = setTask(taskName, TaskInvokableObject, 1, 1);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
/**
......@@ -255,52 +186,12 @@ public class JobGraphBuilder {
return config;
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableClass
* User defined class describing the sink
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass) {
setSink(sinkName, InvokableClass, 1, 1);
}
/**
* Adds a sink component to the JobGraph
*
* @param sinkName
* Name of the sink component
* @param InvokableClass
* User defined class describing the sink
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableClass, parallelism, subtasksPerInstance, sink);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
}
}
/**
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the sink component
* @param InvokableObject
* User defined UserSinkInvokable object
*/
public void setSink(String sinkName, UserSinkInvokable InvokableObject,
public void setSink(String sinkName, UserSinkInvokable InvokableObject,String operatorName,
byte[] serializedFunction) {
Configuration config = setSink(sinkName, InvokableObject, 1, 1);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
/**
......
......@@ -15,12 +15,11 @@
package eu.stratosphere.streaming.api;
import java.io.IOException;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......@@ -63,6 +62,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(streamRecord);
output.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
......
......@@ -19,7 +19,8 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class UserSinkInvokable<T extends Tuple, R extends Tuple> implements RecordInvokable<R>, Serializable {
public abstract class UserSinkInvokable<IN extends Tuple> implements RecordInvokable<IN>,
Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -20,7 +20,7 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserTaskInvokable<IN, OUT extends Tuple> extends StreamInvokableComponent<OUT> implements
public abstract class UserTaskInvokable<IN extends Tuple, OUT extends Tuple> extends StreamInvokableComponent<OUT> implements
RecordInvokable<OUT>, Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -32,13 +32,13 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -131,16 +131,15 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
if (taskBase instanceof StreamTask || taskBase instanceof StreamSink) {
byte[] bytes = taskConfiguration.getBytes("operator", null);
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
if (operatorName.equals("flatMap")) {
ObjectInputStream in;
try {
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
......@@ -149,33 +148,46 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
} catch (Exception e) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
FlatMapFunction.class, f.getClass(), 1, null, null);
}
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
if (taskBase instanceof StreamTask) {
byte[] bytes = taskConfiguration.getBytes("operator", null);
} else if (operatorName.equals("sink")) {
ObjectInputStream in;
try {
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
UserSinkInvokable<Tuple> f = (UserSinkInvokable<Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSinkInvokable.class, f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
} else if (operatorName.equals("source")) {
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
FlatMapFunction.class, f.getClass(), 1, null, null);
UserSourceInvokable.class, f.getClass(), 0, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
collector = new StreamCollector<Tuple>(1, 1, outSerializationDelegate);
} catch (Exception e) {
} else {
throw new Exception();
}
collector = new StreamCollector<Tuple>(1, 1, outSerializationDelegate);
} catch (Exception e) {
throw new StreamComponentException("Nonsupported object passed as operator");
}
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
if (numberOfInputs < 2) {
if (taskBase instanceof StreamTask) {
......@@ -210,20 +222,14 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
if (taskBase instanceof StreamSource) {
byte[] bytes = taskConfiguration.getBytes("operator", null);
ObjectInputStream in;
try {
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, f.getClass(), 0, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
collector = new StreamCollector<Tuple>(1, 1, outSerializationDelegate);
......@@ -231,17 +237,18 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
outputPartitioner));
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
......
......@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
......@@ -17,12 +17,13 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
......@@ -64,7 +65,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(taskBase);
super(taskBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
......@@ -84,7 +85,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
super(outputBase);
super(outputBase,MutableRecordDeserializerFactory.<StreamRecord>get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
......
......@@ -19,8 +19,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
......@@ -47,6 +47,7 @@ public class StreamSink extends AbstractOutputTask {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamSinkHelper.setSerializers(taskConfiguration);
inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
} catch (Exception e) {
if (log.isErrorEnabled()) {
......
......@@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......@@ -75,6 +75,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamSourceHelper.setSerializers(taskConfiguration);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
if (log.isErrorEnabled()) {
......
......@@ -23,10 +23,10 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......@@ -68,6 +68,7 @@ public class StreamTask extends AbstractTask {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
streamTaskHelper.setSerializers(taskConfiguration);
inputs = streamTaskHelper.getConfigInputs(this, taskConfiguration);
streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
......
......@@ -17,10 +17,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
......@@ -20,7 +20,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceCounter;
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Grouping by a key
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Group to the partitioner with the lowest id
......
......@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.partitioner;
import java.util.Random;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Randomly group, to distribute equally
......
......@@ -10,7 +10,6 @@ import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
......@@ -27,15 +26,6 @@ import eu.stratosphere.util.Collector;
public class DataStreamTest {
public static final class MyMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> map(Tuple1<String> value) throws Exception {
System.out.println("in map: " + value.f0);
return new Tuple1<String>("hahahahaha");
}
}
public static final class MyFlatMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
@Override
......@@ -54,9 +44,7 @@ public class DataStreamTest {
// DataStream<Tuple1<String>> dataStream =
// context.setDummySource().map(new MyMap());
DataStream<Tuple1<String>> dataStream = context.setDummySource().flatMap(new MyFlatMap());
dataStream.addSink();
DataStream<Tuple1<String>> dataStream = context.setDummySource().flatMap(new MyFlatMap()).addDummySink();
context.execute();
......@@ -68,6 +56,7 @@ public class DataStreamTest {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
......@@ -99,11 +88,11 @@ public class DataStreamTest {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
UserSinkInvokable<Tuple> f = (UserSinkInvokable<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
FlatMapFunction.class, f.getClass(), 0, null, null);
UserSinkInvokable.class, f.getClass(), 0, null, null);
System.out.println(ts);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册