提交 48512386 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Tested BatchReduce

上级 8da89a74
......@@ -94,7 +94,7 @@ public class DataStream<T extends Tuple> {
return context.addMapFunction(this, mapper);
}
public <R extends Tuple> DataStream<R> flatMap(GroupReduceFunction<T, R> reducer) {
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer) {
return context.addBatchReduceFunction(this, reducer);
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.api.datastream;
import java.io.BufferedReader;
......
......@@ -117,9 +117,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
if (operatorName.equals("flatMap")) {
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
......@@ -152,16 +150,17 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else if (operatorName.equals("batchReduce")) {
GroupReduceFunction<Tuple, Tuple> f = (GroupReduceFunction<Tuple, Tuple>) in.readObject();
GroupReduceFunction<Tuple, Tuple> f = (GroupReduceFunction<Tuple, Tuple>) in
.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class,
f.getClass(), 0, null, null);
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
GroupReduceFunction.class, f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(GroupReduceFunction.class,
f.getClass(), 1, null, null);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
GroupReduceFunction.class, f.getClass(), 1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
......@@ -170,8 +169,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
SinkFunction.class, f.getClass(), 0, null, null);
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(SinkFunction.class,
f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
......@@ -195,7 +194,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......
......@@ -18,22 +18,22 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountCounter extends UserTaskInvokable {
public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
public Tuple2<String, Integer> map(Tuple1<String> inTuple) throws Exception {
word = inTuple.f0;
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
......@@ -43,10 +43,10 @@ public class WordCountCounter extends UserTaskInvokable {
wordCounts.put(word, 1);
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outTuple.f0 = word;
outTuple.f1 = count;
emit(outRecord);
return outTuple;
// performanceCounter.count();
}
......
......@@ -15,39 +15,41 @@
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
import eu.stratosphere.util.Collector;
public class WordCountSplitter extends UserTaskInvokable {
public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000,
"/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 1000, true,
"/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
// PerformanceCounter pCounter = new
// PerformanceCounter("SplitterEmitCounter", 1000, 1000,
// "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
// PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000,
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
@Override
public void invoke(StreamRecord record) throws Exception {
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
words = record.getString(0).split(" ");
words = inTuple.f0.split(" ");
for (String word : words) {
outputRecord.setString(0, word);
pTimer.startTimer();
emit(outputRecord);
pTimer.stopTimer();
pCounter.count();
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
// pCounter.count();
}
}
@Override
public String getResult() {
pCounter.writeCSV();
pTimer.writeCSV();
return "";
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import static org.junit.Assert.fail;
import java.util.Iterator;
import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.datastream.SourceFunction;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class BatchReduceTest {
public static final class MyBatchReduce extends
GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
@Override
public void reduce(Iterator<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
throws Exception {
Double sum = 0.;
Double count = 0.;
while (values.hasNext()) {
sum += values.next().f0;
count++;
}
out.collect(new Tuple1<Double>(sum / count));
System.out.println("batchReduce " + sum);
}
}
public static final class MySink extends SinkFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> tuple) {
System.out.println("AVG: " + tuple);
}
}
public static final class MySource extends SourceFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<Double>> collector) {
for (Double i = 0.; i < 20; i++) {
collector.collect(new Tuple1<Double>(i));
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4);
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册