提交 8da89a74 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Added FileSourceFunction

上级 ed422377
package eu.stratosphere.api.datastream;
import java.util.Iterator;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private GroupReduceFunction<IN, OUT> reducer;
public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction) {
this.reducer = reduceFunction;
}
@Override
public void invoke(StreamRecord record, StreamCollector<OUT> collector) throws Exception {
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector);
}
}
......@@ -21,12 +21,11 @@ import java.util.Random;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.types.TypeInformation;
//TODO:get batchsize from user -> put in config -> set in streamcomponenthelper for collector
//TODO:batchReduce -> tuple iterator over tuplebatch, out tuple (reduce function)
public class DataStream<T extends Tuple> {
private final StreamExecutionEnvironment context;
......@@ -95,6 +94,10 @@ public class DataStream<T extends Tuple> {
return context.addMapFunction(this, mapper);
}
public <R extends Tuple> DataStream<R> flatMap(GroupReduceFunction<T, R> reducer) {
return context.addBatchReduceFunction(this, reducer);
}
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return context.addSink(this, sinkFunction);
}
......
package eu.stratosphere.api.datastream;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
public FileSourceFunction(String path) {
this.path = path;
}
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
outTuple.f0 = line;
collector.collect(outTuple);
}
line = br.readLine();
}
br.close();
}
}
......@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -34,10 +35,15 @@ import eu.stratosphere.util.Collector;
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
private static final int BATCH_SIZE = 1;
public StreamExecutionEnvironment(int batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, batchSize);
}
public StreamExecutionEnvironment() {
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE);
this(1);
}
private static class DummySource extends UserSourceInvokable<Tuple1<String>> {
......@@ -49,7 +55,6 @@ public class StreamExecutionEnvironment {
collector.collect(new Tuple1<String>("source"));
}
}
}
public static enum ConnectionType {
......@@ -122,6 +127,27 @@ public class StreamExecutionEnvironment {
return returnStream;
}
public <T extends Tuple, R extends Tuple> DataStream<R> addBatchReduceFunction(
DataStream<T> inputStream, final GroupReduceFunction<T, R> reducer) {
DataStream<R> returnStream = new DataStream<R>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(reducer);
} catch (IOException e) {
e.printStackTrace();
}
jobGraphBuilder.setTask(returnStream.getId(), new BatchReduceInvokable<T, R>(reducer),
"batchReduce", baos.toByteArray());
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
DataStream<T> returnStream = new DataStream<T>(this);
......@@ -182,6 +208,10 @@ public class StreamExecutionEnvironment {
return returnStream;
}
public DataStream<Tuple1<String>> addFileSource(String path) {
return addSource(new FileSourceFunction(path));
}
public DataStream<Tuple1<String>> addDummySource() {
DataStream<Tuple1<String>> returnStream = new DataStream<Tuple1<String>>(this);
......
......@@ -64,7 +64,7 @@ public class JobGraphBuilder {
protected String maxParallelismVertexName;
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
private int batchSize;
/**
* Creates a new JobGraph with the given name
*
......@@ -84,6 +84,7 @@ public class JobGraphBuilder {
log.debug("JobGraph created");
}
this.faultToleranceType = faultToleranceType;
batchSize = 1;
}
/**
......@@ -97,6 +98,11 @@ public class JobGraphBuilder {
this(jobGraphName, FaultToleranceType.NONE);
}
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) {
this(jobGraphName,faultToleranceType);
this.batchSize = batchSize;
}
/**
* Adds source to the JobGraph by user defined object with no parallelism
*
......@@ -111,7 +117,6 @@ public class JobGraphBuilder {
Configuration config = setSource(sourceName, InvokableObject, 1, 1);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
/**
......@@ -250,7 +255,7 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
// config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.api.invokable;
import java.io.IOException;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
......@@ -25,6 +26,6 @@ public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamCompo
private static final long serialVersionUID = 1L;
public abstract void invoke(Collector<OUT> collector);
public abstract void invoke(Collector<OUT> collector) throws Exception;
}
......@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
......@@ -104,10 +105,12 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public StreamCollector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
collector = new StreamCollector<Tuple>(1, id, outSerializationDelegate, outputs);
int batchSize = taskConfiguration.getInteger("batchSize", -1);
collector = new StreamCollector<Tuple>(batchSize, id, outSerializationDelegate, outputs);
return collector;
}
// TODO add type parameters to avoid redundant code
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
......@@ -147,6 +150,22 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
} else if (operatorName.equals("batchReduce")) {
GroupReduceFunction<Tuple, Tuple> f = (GroupReduceFunction<Tuple, Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class,
f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class,
f.getClass(), 1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
} else if (operatorName.equals("sink")) {
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
......
......@@ -18,22 +18,22 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String, Integer>> {
public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public Tuple2<String, Integer> map(Tuple1<String> inTuple) throws Exception {
word = inTuple.f0;
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
......@@ -43,10 +43,10 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
wordCounts.put(word, 1);
}
outTuple.f0 = word;
outTuple.f1 = count;
outRecord.setString(0, word);
outRecord.setInteger(1, count);
return outTuple;
emit(outRecord);
// performanceCounter.count();
}
......
......@@ -15,41 +15,39 @@
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
public class WordCountSplitter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
// PerformanceCounter pCounter = new
// PerformanceCounter("SplitterEmitCounter", 1000, 1000,
// "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
// PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000,
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000,
"/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 1000, true,
"/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
public void invoke(StreamRecord record) throws Exception {
words = inTuple.f0.split(" ");
words = record.getString(0).split(" ");
for (String word : words) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
// pCounter.count();
outputRecord.setString(0, word);
pTimer.startTimer();
emit(outputRecord);
pTimer.stopTimer();
pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
@Override
public String getResult() {
pCounter.writeCSV();
pTimer.writeCSV();
return "";
}
}
\ No newline at end of file
......@@ -16,6 +16,7 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
......@@ -67,15 +68,19 @@ public class FlatMapTest {
for (int i = 0; i < 5; i++) {
collector.collect(new Tuple1<String>("hi"));
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0);
fail();
} catch (IllegalArgumentException e) {
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment(2);
DataStream<Tuple1<String>> dataStream0 = context.addSource(new MySource());
DataStream<Tuple1<String>> dataStream1 = context.addDummySource().connectWith(dataStream0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册