提交 cde1d463 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] package refactor

上级 586edb62
package eu.stratosphere.api.datastream;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
public FileSourceFunction(String path) {
this.path = path;
}
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
outTuple.f0 = line;
collector.collect(outTuple);
}
line = br.readLine();
}
br.close();
}
}
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import java.util.Iterator; import java.util.Iterator;
import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......
...@@ -13,17 +13,17 @@ ...@@ -13,17 +13,17 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.types.TypeInformation; import eu.stratosphere.types.TypeInformation;
public class DataStream<T extends Tuple> { public class DataStream<T extends Tuple> {
...@@ -94,7 +94,7 @@ public class DataStream<T extends Tuple> { ...@@ -94,7 +94,7 @@ public class DataStream<T extends Tuple> {
return context.addMapFunction(this, mapper); return context.addMapFunction(this, mapper);
} }
public <R extends Tuple> DataStream<R> flatMap(GroupReduceFunction<T, R> reducer) { public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer) {
return context.addBatchReduceFunction(this, reducer); return context.addBatchReduceFunction(this, reducer);
} }
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
public FileSourceFunction(String path) {
this.path = path;
}
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
outTuple.f0 = line;
collector.collect(outTuple);
}
line = br.readLine();
}
br.close();
}
}
...@@ -13,11 +13,10 @@ ...@@ -13,11 +13,10 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......
...@@ -13,11 +13,10 @@ ...@@ -13,11 +13,10 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import java.io.Serializable; import java.io.Serializable;
......
...@@ -13,10 +13,9 @@ ...@@ -13,10 +13,9 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.api.datastream; package eu.stratosphere.streaming.api;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
...@@ -24,7 +24,6 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction; ...@@ -24,7 +24,6 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil; import eu.stratosphere.streaming.util.ClusterUtil;
......
...@@ -24,7 +24,6 @@ import java.util.List; ...@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
...@@ -42,6 +41,7 @@ import eu.stratosphere.nephele.io.RecordWriter; ...@@ -42,6 +41,7 @@ import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
...@@ -117,9 +117,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -117,9 +117,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try { try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes)); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
if (operatorName.equals("flatMap")) { if (operatorName.equals("flatMap")) {
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject(); FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
...@@ -152,16 +150,17 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -152,16 +150,17 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else if (operatorName.equals("batchReduce")) { } else if (operatorName.equals("batchReduce")) {
GroupReduceFunction<Tuple, Tuple> f = (GroupReduceFunction<Tuple, Tuple>) in.readObject(); GroupReduceFunction<Tuple, Tuple> f = (GroupReduceFunction<Tuple, Tuple>) in
.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
f.getClass(), 0, null, null); GroupReduceFunction.class, f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer(); inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer); inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class, outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
f.getClass(), 1, null, null); GroupReduceFunction.class, f.getClass(), 1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer(); outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer); outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
...@@ -170,8 +169,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -170,8 +169,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject(); SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class,
SinkFunction.class, f.getClass(), 0, null, null); f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer(); inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer); inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
...@@ -195,7 +194,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -195,7 +194,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} }
} }
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration) public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException { throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
package eu.stratosphere.streaming.examples.wordcount; package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WordCountLocal { public class WordCountLocal {
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
package eu.stratosphere.streaming.examples.wordcount; package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
public class WordCountSink extends SinkFunction<Tuple2<String, Integer>> { public class WordCountSink extends SinkFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import static org.junit.Assert.fail;
import java.util.Iterator;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class BatchReduceTest {
public static final class MyBatchReduce extends
GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
@Override
public void reduce(Iterator<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
throws Exception {
Double sum = 0.;
Double count = 0.;
while (values.hasNext()) {
sum += values.next().f0;
count++;
}
out.collect(new Tuple1<Double>(sum / count));
System.out.println("batchReduce " + sum);
}
}
public static final class MySink extends SinkFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> tuple) {
System.out.println("AVG: " + tuple);
}
}
public static final class MySource extends SourceFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<Double>> collector) {
for (Double i = 0.; i < 20; i++) {
collector.collect(new Tuple1<Double>(i));
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4);
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute();
}
}
...@@ -23,10 +23,6 @@ import java.io.ObjectInputStream; ...@@ -23,10 +23,6 @@ import java.io.ObjectInputStream;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.datastream.SourceFunction;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
......
...@@ -22,9 +22,6 @@ import java.io.ObjectInputStream; ...@@ -22,9 +22,6 @@ import java.io.ObjectInputStream;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
......
package eu.stratosphere.streaming.api;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.types.TypeInformation;
public class TypeExtractTest {
public static class MySuperlass<T> implements Serializable{
}
public static class Myclass extends MySuperlass<Integer> {
private static final long serialVersionUID = 1L;
}
@Test
public void test() throws IOException, ClassNotFoundException {
Myclass f = new Myclass();
System.out.println(f.getClass().getGenericSuperclass());
TypeInformation<?> ts = TypeExtractor.createTypeInfo(MySuperlass.class, f.getClass(), 0,
null, null);
System.out.println(ts);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
oos = new ObjectOutputStream(baos);
oos.writeObject(f);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
System.out.println(new TupleTypeInfo<Tuple>(TypeExtractor.getForObject(in.readObject())));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册