提交 8457ed23 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] comments on examples

上级 07f53c39
......@@ -160,8 +160,8 @@ public class DataStream<T extends Tuple> {
return context.addSink(this.copy(), sinkFunction);
}
public DataStream<T> addDummySink() {
return context.addDummySink(this.copy());
public DataStream<T> print() {
return context.print(this.copy());
}
protected void setType(TypeInformation<T> type) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FileStreamFunction extends SourceFunction<Tuple1<String>>{
private static final long serialVersionUID = 1L;
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
public FileStreamFunction(String path) {
this.path = path;
}
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
while(true){
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
outTuple.f0 = line;
collector.collect(outTuple);
}
line = br.readLine();
}
br.close();
}
}
}
......@@ -460,10 +460,10 @@ public class JobGraphBuilder {
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger(
"numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), numberOfInstances.get(downStreamComponentName));
config.setInteger("numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1),
numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
......@@ -566,4 +566,13 @@ public class JobGraphBuilder {
setNumberOfJobOutputs();
return jobGraph;
}
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
}
......@@ -131,14 +131,21 @@ public class StreamExecutionEnvironment {
}
//TODO: link to JobGraph, JobVertex, user-defined spellcheck
// TODO: link to JobGraph, JobVertex, user-defined spellcheck
/**
* Internal function for passing the user defined functions to the JobGraph of the job.
* @param functionName name of the function
* @param inputStream input data stream
* @param function the user defined function
* @param functionInvokable the wrapping JobVertex instance
* @param parallelism number of parallel instances of the function
* Internal function for passing the user defined functions to the JobGraph
* of the job.
*
* @param functionName
* name of the function
* @param inputStream
* input data stream
* @param function
* the user defined function
* @param functionInvokable
* the wrapping JobVertex instance
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
......@@ -177,10 +184,10 @@ public class StreamExecutionEnvironment {
e.printStackTrace();
}
jobGraphBuilder.setSink("sink", new SinkInvokable<T>(sinkFunction), "sink",
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray(), parallelism, parallelism);
connectGraph(inputStream, "sink");
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
......@@ -190,19 +197,22 @@ public class StreamExecutionEnvironment {
return addSink(inputStream, sinkFunction, 1);
}
public static final class DummySink extends SinkFunction<Tuple1<String>> {
public static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> tuple) {
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
public <T extends Tuple> DataStream<T> addDummySink(DataStream<T> inputStream) {
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return addSink(inputStream, (SinkFunction<T>) new DummySink());
return returnStream;
}
public void execute() {
......@@ -226,12 +236,16 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
baos.toByteArray(), parallelism, parallelism);
return returnStream;
return returnStream.copy();
}
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path), 1);
}
public DataStream<Tuple1<String>> readTextStream(String path) {
return addSource(new FileStreamFunction(path),1);
}
public DataStream<Tuple1<String>> addDummySource() {
DataStream<Tuple1<String>> returnStream = new DataStream<Tuple1<String>>(this);
......
......@@ -121,8 +121,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollector2<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
......@@ -368,6 +368,15 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
......
......@@ -46,6 +46,7 @@ public class StreamSink extends AbstractOutputTask {
try {
streamSinkHelper.setSerializers(taskConfiguration);
streamSinkHelper.setSinkSerializer();
inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
} catch (Exception e) {
if (log.isErrorEnabled()) {
......
......@@ -26,11 +26,11 @@ public class BasicTopology {
public static class BasicSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
Tuple1<String> tuple = new Tuple1<String>("streaming");
Tuple1<String> tuple = new Tuple1<String>("streaming");
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// continuously emit a tuple
// emit continuously a tuple
while (true) {
collector.collect(tuple);
}
......@@ -53,9 +53,12 @@ public class BasicTopology {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM)
.map(new BasicMap(), PARALELISM);
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM).map(new BasicMap(), PARALELISM).addDummySink();
stream.print();
env.execute();
}
}
......@@ -85,7 +85,6 @@ public class CellInfoLocal {
Tuple1<String> outTuple = new Tuple1<String>();
// write information to String tuple based on the input tuple
@Override
public void flatMap(Tuple4<Boolean, Integer, Long, Integer> value,
Collector<Tuple1<String>> out) throws Exception {
......@@ -95,7 +94,8 @@ public class CellInfoLocal {
// QUERY
if (value.f0) {
lastMillis = value.f3;
outTuple.f0 = "QUERY:\t"+cellID+ ": " + engine.get(timeStamp, lastMillis, cellID);
outTuple.f0 = "QUERY:\t" + cellID + ": "
+ engine.get(timeStamp, lastMillis, cellID);
out.collect(outTuple);
}
// INFO
......@@ -107,20 +107,16 @@ public class CellInfoLocal {
}
}
//In this example two different source then connect the two stream and apply a function for the connected stream
// TODO add arguments
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env
.addSource(new QuerySource(), SOURCE_PARALELISM);
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
new QuerySource(), SOURCE_PARALELISM);
DataStream<Tuple1<String>> stream = env
.addSource(new InfoSource(), SOURCE_PARALELISM)
.connectWith(querySource)
.partitionBy(1)
.flatMap(new CellTask(), PARALELISM)
.addDummySink();
DataStream<Tuple1<String>> stream = env.addSource(new InfoSource(), SOURCE_PARALELISM)
.connectWith(querySource).partitionBy(1).flatMap(new CellTask(), PARALELISM);
stream.print();
env.execute();
}
......
......@@ -27,19 +27,25 @@ public class JoinLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
// This example will join two streams. One which emits people's grades and
// one which emits people's salaries.
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> source1 = env
.addSource(new JoinSourceOne(), SOURCE_PARALELISM);
DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new JoinSourceOne(),
SOURCE_PARALELISM);
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = env
.addSource(new JoinSourceTwo(), SOURCE_PARALELISM).connectWith(source1).partitionBy(1)
.flatMap(new JoinTask(), PARALELISM).addSink(new JoinSink());
.addSource(new JoinSourceTwo(), SOURCE_PARALELISM)
.connectWith(source1)
.partitionBy(1)
.flatMap(new JoinTask(), PARALELISM)
.addSink(new JoinSink());
env.execute();
......
......@@ -32,6 +32,7 @@ public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outRecord.f0 = "salary";
......
......@@ -32,6 +32,7 @@ public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
......
......@@ -39,7 +39,10 @@ public class JoinTask extends
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
String streamId = value.f0;
String name = value.f1;
;
// Joins the input value with the already known values. If it is a grade
// then with the salaries, if it is a salary then with the grades. Also
// stores the new element.
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
for (Integer salary : salaryHashmap.get(name)) {
......
......@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
......@@ -120,15 +119,6 @@ public class IncrementalLearningSkeleton {
}
public static class IMLSink extends SinkFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Integer> inTuple) {
// do nothing
}
}
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
......@@ -144,8 +134,9 @@ public class IncrementalLearningSkeleton {
DataStream<Tuple1<Integer>> prediction =
env.addSource(new NewDataSource(), SOURCE_PARALELISM)
.connectWith(model)
.map(new Predictor(), PARALELISM)
.addSink(new IMLSink());
.map(new Predictor(), PARALELISM);
prediction.print();
env.execute();
}
......
......@@ -24,7 +24,6 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
......@@ -150,15 +149,6 @@ public class IncrementalOLS {
}
public static class IncOLSSink extends SinkFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> inTuple) {
System.out.println(inTuple);
}
}
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
......@@ -174,9 +164,10 @@ public class IncrementalOLS {
DataStream<Tuple1<Double>> prediction =
env.addSource(new NewDataSource(), SOURCE_PARALELISM)
.connectWith(model)
.map(new Predictor(), PARALELISM)
.addSink(new IncOLSSink());
.map(new Predictor(), PARALELISM);
prediction.print();
env.execute();
}
}
......@@ -29,19 +29,25 @@ public class WindowJoinLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> source1 = env
.addSource(new WindowJoinSourceOne(), SOURCE_PARALELISM);
DataStream<Tuple4<String, String, Integer, Long>> source1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM);
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = env
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALELISM).connectWith(source1).partitionBy(1)
.flatMap(new WindowJoinTask(), PARALELISM).addSink(new JoinSink());
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALELISM)
.connectWith(source1)
.partitionBy(1)
.flatMap(new WindowJoinTask(), PARALELISM)
.addSink(new JoinSink());
env.execute();
......
......@@ -33,6 +33,7 @@ public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, I
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
......
......@@ -33,6 +33,7 @@ public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, I
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
......
......@@ -63,6 +63,10 @@ public class WindowJoinTask extends
String streamId = value.f0;
String name = value.f1;
Long progress = value.f3;
// Joins the input value with the already known values on a given interval. If it is a grade
// then with the salaries, if it is a salary then with the grades. Also
// stores the new element.
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
......
......@@ -24,6 +24,8 @@ public class WindowSumLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
......
......@@ -24,6 +24,8 @@ public class WindowWordCountLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
// This example will count the occurrence of each word in the input file with a sliding window.
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
......
......@@ -29,6 +29,7 @@ public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>>
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private Long timestamp = 0L;
// Reads the lines of the input file and adds a timestamp to it.
@Override
public void invoke(Collector<Tuple2<String, Long>> collector) throws Exception {
BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
......
......@@ -26,6 +26,7 @@ public class WindowWordCountSplitter extends FlatMapFunction<Tuple2<String, Long
private Long timestamp = 0L;
private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
// Splits the lines according to the spaces. And adds the line's timestamp to them.
@Override
public void flatMap(Tuple2<String, Long> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
......
......@@ -31,6 +31,7 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
// Increments the counter of the occurrence of the input word
@Override
public Tuple2<String, Integer> map(Tuple1<String> inTuple) throws Exception {
word = inTuple.f0;
......
......@@ -22,19 +22,21 @@ import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountLocal {
// This example will count the occurrence of each word in the input file.
public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextFile("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountSplitter(), 1)
.partitionBy(0)
.map(new WordCountCounter(), 1)
.addSink(new WordCountSink());
.map(new WordCountCounter(), 1);
dataStream.print();
env.execute();
}
}
......@@ -32,9 +32,11 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
// Splits the lines according to the spaces.
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
......
......@@ -27,7 +27,7 @@ public class KafkaTopology {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:7077", "group", "topic", 1), SOURCE_PARALELISM)
.addDummySink();
.print();
context.execute();
}
......
......@@ -13,16 +13,28 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
package eu.stratosphere.streaming.performance;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.examples.wordcount.WordCountCounter;
import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountSink extends SinkFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public class WordCountPerformanceLocal {
public static void main(String[] args) {
@Override
public void invoke(Tuple2<String, Integer> inTuple) {
System.out.println(inTuple);
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountPerformanceSplitter(), 1)
.broadcast()
.map(new WordCountCounter(), 3);
dataStream.print();
env.execute();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.performance;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.util.Collector;
public class WordCountPerformanceSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
private Tuple1<String> outTuple = new Tuple1<String>();
PerformanceCounter pCounter = new
PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/judit/strato/perf/broadcast4.csv");
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
......@@ -20,15 +20,15 @@ import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class RMQTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"), SOURCE_PARALELISM)
.addDummySink();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"),
SOURCE_PARALELISM).print();
context.execute();
}
}
\ No newline at end of file
......@@ -15,7 +15,9 @@
package eu.stratosphere.streaming.util;
public class PerformanceCounter extends PerformanceTracker {
import java.io.Serializable;
public class PerformanceCounter extends PerformanceTracker implements Serializable{
public PerformanceCounter(String name, int counterLength, int countInterval, String fname) {
super(name, counterLength, countInterval, fname);
......
......@@ -17,10 +17,11 @@ package eu.stratosphere.streaming.util;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class PerformanceTracker {
public class PerformanceTracker implements Serializable{
protected List<Long> timeStamps;
protected List<Long> values;
......@@ -86,6 +87,7 @@ public class PerformanceTracker {
if (dumpInterval > 0) {
if (ctime - lastDump > dumpInterval) {
System.out.println("csv-be iras!!");
writeCSV();
lastDump = ctime;
}
......
......@@ -19,6 +19,7 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import org.junit.Test;
......@@ -28,6 +28,7 @@ public class BatchTest {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private static int count = 0;
private static boolean partitionCorrect = true;
private static final class MySource extends SourceFunction<Tuple1<String>> {
......@@ -58,6 +59,18 @@ public class BatchTest {
}
}
private static final class MyPartitionSink extends SinkFunction<Tuple1<String>> {
int hash=-1000;
@Override
public void invoke(Tuple1<String> tuple) {
if(hash==-1000) hash=tuple.f0.hashCode() % 93;
else{
if(hash!=tuple.f0.hashCode() % 93) partitionCorrect=false;
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
......@@ -74,4 +87,19 @@ public class BatchTest {
assertEquals(20, count);
}
@Test
public void partitionTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream = env
.addSource(new MySource(), SOURCE_PARALELISM)
.flatMap(new MyMap(), PARALELISM).batch(4)
.partitionBy(0)
.addSink(new MyPartitionSink(), 93);
env.execute();
assertTrue(partitionCorrect);
}
}
......@@ -33,6 +33,7 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("source "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -62,6 +63,8 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
System.out.println("mymap "+map);
map++;
return new Tuple1<Integer>(value.f0 * value.f0);
}
}
......@@ -87,7 +90,6 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
counter++;
System.out.println(counter);
if (counter > 3)
threeInAll = false;
return new Tuple1<Integer>(value.f0 * value.f0);
......@@ -138,6 +140,7 @@ public class MapTest {
@Override
public void invoke(Tuple1<Integer> tuple) {
System.out.println("sink "+graphResult);
graphResult++;
}
}
......@@ -149,6 +152,7 @@ public class MapTest {
private static int fieldsResult = 0;
private static int diffFieldsResult = 0;
private static int graphResult = 0;
private static int map = 0;
private static final int PARALELISM = 1;
private static final int MAXSOURCE = 10;
private static boolean allInOne = false;
......@@ -244,19 +248,24 @@ public class MapTest {
}
// @Test
// public void graphTest() throws Exception {
// StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// DataStream<Tuple1<Integer>> dataStream = env
// .addSource(new MySource(), 2)
// .partitionBy(0)
// .map(new MyMap(), 3)
// .broadcast()
// .addSink(new MyGraphSink(),2);
//
// env.execute();
// assertEquals(40, graphResult);
// for(int i=0; i<1000; i++){
// System.out.println(i);
// StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// DataStream<Tuple1<Integer>> dataStream = env
// .addSource(new MySource(), 2)
// .partitionBy(0)
// .map(new MyMap(), 3)
// .broadcast()
// .addSink(new MyGraphSink(),2);
//
// env.execute();
// assertEquals(40, graphResult);
// graphResult=0;
// map=0;
// }
//
// }
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.util.Collector;
public class PrintTest {
public static final class MyFlatMap extends
FlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
throws Exception {
out.collect(new Tuple2<Integer, String>(value.f0 * value.f0, value.f1));
}
}
public static final class MySource extends SourceFunction<Tuple2<Integer, String>> {
@Override
public void invoke(Collector<Tuple2<Integer, String>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple2<Integer, String>(i, "test"));
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, String>> source = env.addSource(new MySource(), 1);
DataStream<Tuple2<Integer, String>> map = source.flatMap(new MyFlatMap(), 1).print();
env.execute();
}
}
......@@ -22,6 +22,9 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
......@@ -31,6 +34,8 @@ public class TestDataUtilTest {
// public void testDownload() throws FileNotFoundException, IOException {
// String fileToDownload = "hamlet.txt";
// String expectedFile = "hamletTestExpectation.txt";
//
// deleteFile(TestDataUtil.testDataDir + fileToDownload);
//
// TestDataUtil.download(fileToDownload);
//
......@@ -38,6 +43,12 @@ public class TestDataUtilTest {
// + fileToDownload));
// }
public void deleteFile(String fileLocation) throws IOException{
Path path = Paths.get(fileLocation);
if(Files.exists(path))
Files.delete(path);
}
public boolean compareFile(String file1, String file2) throws FileNotFoundException,
IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册