提交 b9d0d0b5 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] TypeExtraction test works

上级 1971b67b
package eu.stratosphere.api.datastream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
......@@ -37,23 +39,22 @@ public class StreamExecutionEnvironment {
}
}
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFlatMapFunction(DataStream<T> inputStream, final FlatMapFunction<T, R> flatMapper, TypeInformation<R> returnType) {
DataStream<R> returnStream = new DataStream<R>(this, returnType);
jobGraphBuilder.setTask(inputStream.getId(), new UserTaskInvokable<T, R>() {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record, StreamCollector<R> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
T tuple = (T) record.getTuple(i);
flatMapper.flatMap(tuple, collector);
// outRecord.setTuple(i, (Tuple) resultTuple);
}
}
});
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(flatMapper);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setTask(returnStream.getId(), new FlatMapInvokable<T,R>(flatMapper), baos.toByteArray());
jobGraphBuilder.shuffleConnect(inputStream.getId(), returnStream.getId());
......@@ -84,13 +85,15 @@ public class StreamExecutionEnvironment {
public void execute(String idToSink) {
jobGraphBuilder.setSink("sink", new UserSinkInvokable() {
@Override
public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
System.out.println("SINK: " + record);
}
});
jobGraphBuilder.setSink("sink", new DefaultSinkInvokable());
// new UserSinkInvokable() {
//
// @Override
// public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
// System.out.println("SINK: " + record);
// }
// });
jobGraphBuilder.shuffleConnect(idToSink, "sink");
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
......
......@@ -57,12 +57,12 @@ public class JobGraphBuilder {
private static final Log log = LogFactory.getLog(JobGraphBuilder.class);
private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components;
private Map<String, Integer> numberOfInstances;
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
private FaultToleranceType faultToleranceType;
protected Map<String, AbstractJobVertex> components;
protected Map<String, Integer> numberOfInstances;
protected Map<String, List<Integer>> numberOfOutputChannels;
protected String maxParallelismVertexName;
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
/**
* Creates a new JobGraph with the given name
......@@ -168,7 +168,7 @@ public class JobGraphBuilder {
log.debug("SOURCE: " + sourceName);
}
}
/**
* Adds a task component to the JobGraph with no parallelism
*
......@@ -192,17 +192,24 @@ public class JobGraphBuilder {
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
* @return
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass,
public Configuration setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, InvokableClass, parallelism, subtasksPerInstance, task);
Configuration config = setComponent(taskName, InvokableClass, parallelism, subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
return config;
}
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, byte[] serializedFunction) {
Configuration config = setTask(taskName, TaskInvokableObject, 1, 1);
config.setBytes("operator", serializedFunction);
}
/**
* Adds a task component to the JobGraph with no parallelism
*
......@@ -226,15 +233,17 @@ public class JobGraphBuilder {
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
* @return
*/
public void setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
public Configuration setTask(String taskName, UserTaskInvokable TaskInvokableObject, int parallelism,
int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
Configuration config = setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
return config;
}
/**
......@@ -320,7 +329,7 @@ public class JobGraphBuilder {
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName,
private Configuration setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
component.setNumberOfSubtasks(parallelism);
......@@ -334,11 +343,22 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
//config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
return config;
}
private byte[] getSerializedFunction() {
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
// ObjectOutputStream oos = new ObjectOutputStream(baos);
// baos
// return ;
return null;
}
private void setComponent(String componentName, UserSourceInvokable InvokableObject,
......@@ -349,12 +369,13 @@ public class JobGraphBuilder {
addSerializedObject(InvokableObject, component);
}
private void setComponent(String componentName, UserTaskInvokable InvokableObject,
private Configuration setComponent(String componentName, UserTaskInvokable InvokableObject,
int parallelism, int subtasksPerInstance, AbstractJobVertex component) {
setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, subtasksPerInstance,
component);
addSerializedObject(InvokableObject, component);
return config;
}
private void setComponent(String componentName, UserSinkInvokable InvokableObject,
......
......@@ -15,16 +15,16 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
String value = (String) record.getTuple(0).getField(0);
System.out.println(value);
System.out.println(value);
}
}
\ No newline at end of file
......@@ -20,7 +20,7 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserTaskInvokable<IN, OUT extends Tuple> extends StreamInvokableComponent implements
public abstract class UserTaskInvokable<IN, OUT extends Tuple> extends StreamInvokableComponent<IN, OUT> implements
RecordInvokable<OUT>, Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -121,15 +121,15 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
// TODO get deserialization delegates
// ObjectInputStream in = new ObjectInputStream(new
// ByteArrayInputStream(taskConfiguration.getBytes("operator", null)));
//
// MyGeneric<?> f = (MyGeneric<?>) in.readObject();
//
// TypeInformation<Tuple> ts =(TypeInformation<Tuple>)
// TypeExtractor.createTypeInfo(MyGeneric.class,
// f.getClass(), 0,
// null, null);
// ObjectInputStream in = new ObjectInputStream(new
// ByteArrayInputStream(taskConfiguration.getBytes("operator", null)));
//
// MyGeneric<?> f = (MyGeneric<?>) in.readObject();
//
// TypeInformation<Tuple> ts =(TypeInformation<Tuple>)
// TypeExtractor.createTypeInfo(MyGeneric.class,
// f.getClass(), 0,
// null, null);
TupleTypeInfo<Tuple> ts = null;
TupleSerializer<Tuple> tupleSerializer = ts.createSerializer();
......
......@@ -21,13 +21,14 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent implements Serializable {
public abstract class StreamInvokableComponent<IN, OUT extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
......
package eu.stratosphere.streaming.api;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.FlatMapInvokable;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.MyGeneric;
import eu.stratosphere.types.TypeInformation;
import eu.stratosphere.util.Collector;
public class DataStreamTest {
......@@ -24,31 +37,50 @@ public class DataStreamTest {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
}
}
@Test
public void test() {
public void test() throws IOException, ClassNotFoundException {
Tuple1<String> tup = new Tuple1<String>("asd");
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
// DataStream<Tuple1<String>> dataStream = context.setDummySource().map(new MyMap());
// DataStream<Tuple1<String>> dataStream =
// context.setDummySource().map(new MyMap());
DataStream<Tuple1<String>> dataStream = context.setDummySource().flatMap(new MyFlatMap());
JobGraphBuilder jgb = context.jobGB();
context.execute(dataStream.getId());
//
// map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
//
// @Override
// public Tuple1<String> map(Tuple1<String> value) throws Exception {
// // TODO Auto-generated method stub
// return null;
// }
// });
// System.out.println(jgb.components);
for (AbstractJobVertex c : jgb.components.values()) {
if (c instanceof JobTaskVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<?,?> f = (FlatMapFunction<?,?>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TypeInformation<?> ts = (TypeInformation<?>) TypeExtractor.createTypeInfo(FlatMapFunction.class, f.getClass(), 1, null, null);
System.out.println(ts);
System.out.println("----------------");
}
}
// context.execute(dataStream.getId());
//
// map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
//
// @Override
// public Tuple1<String> map(Tuple1<String> value) throws Exception {
// // TODO Auto-generated method stub
// return null;
// }
// });
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册