提交 54a66d7a 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] using Sets for testing parallelism

上级 7b7ac3d0
......@@ -38,6 +38,9 @@ public class DataStream<T extends Tuple> {
List<Integer> cparams;
List<Integer> batchSizes;
/**
* Constructor
*/
protected DataStream() {
// TODO implement
environment = new StreamExecutionEnvironment();
......@@ -45,6 +48,10 @@ public class DataStream<T extends Tuple> {
initConnections();
}
/**
* Constructor
* @param context
*/
protected DataStream(StreamExecutionEnvironment environment) {
if (environment == null) {
throw new NullPointerException("context is null");
......@@ -57,6 +64,11 @@ public class DataStream<T extends Tuple> {
}
/**
* Constructor
* @param context
* @param id
*/
private DataStream(StreamExecutionEnvironment environment, String id) {
this(environment);
this.id = id;
......@@ -64,6 +76,9 @@ public class DataStream<T extends Tuple> {
//TODO: create copy method (or constructor) and copy datastream at every operator
/**
* Initialize the connections.
*/
private void initConnections() {
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
......@@ -76,6 +91,11 @@ public class DataStream<T extends Tuple> {
}
/**
* Creates an identical datastream.
* @return
* The identical datastream.
*/
public DataStream<T> copy() {
DataStream<T> copiedStream = new DataStream<T>(environment, getId());
copiedStream.type = this.type;
......@@ -88,10 +108,22 @@ public class DataStream<T extends Tuple> {
return copiedStream;
}
/**
* Gets the id of the datastream.
* @return
* The id of the datastream.
*/
public String getId() {
return id;
}
/**
* Collects a number of consecutive elements from the datastream.
* @param batchSize
* The number of elements to collect.
* @return
* The collected elements.
*/
public DataStream<T> batch(int batchSize) {
DataStream<T> returnStream = copy();
......@@ -105,6 +137,13 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Connecting streams to each other.
* @param stream
* The stream it connects to.
* @return
* The new already connected datastream.
*/
public DataStream<T> connectWith(DataStream<T> stream) {
DataStream<T> returnStream = copy();
......@@ -115,6 +154,13 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Send the elements of the stream to the following vertices according to their hashcode.
* @param keyposition
* The field used to compute the hashcode.
* @return
* The original datastream.
*/
public DataStream<T> partitionBy(int keyposition) {
DataStream<T> returnStream = copy();
......@@ -125,6 +171,11 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Send the elements of the stream to every following vertices of the graph.
* @return
* The datastream.
*/
public DataStream<T> broadcast() {
DataStream<T> returnStream = copy();
......@@ -134,40 +185,109 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Sets the given flatmap function.
* @param flatMapper
* The object containing the flatmap function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, int paralelism) {
return environment.addFunction("flatMap", this.copy(), flatMapper, new FlatMapInvokable<T, R>(
flatMapper), paralelism);
}
/**
* Sets the given map function.
* @param mapper
* The object containing the map function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int paralelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper), paralelism);
}
/**
* Sets the given batchreduce function.
* @param reducer
* The object containing the batchreduce function.
* @param batchSize
* The number of elements proceeded at the same time
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer, int batchSize, int paralelism) {
return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer, new BatchReduceInvokable<T, R>(
reducer), paralelism);
}
/**
* Sets the given filter function.
* @param filter
* The object containing the filter function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public DataStream<T> filter(FilterFunction<T> filter, int paralelism) {
return environment.addFunction("filter", this.copy(), filter, new FilterInvokable<T>(filter), paralelism);
}
/**
* Sets the given sink function.
* @param sinkFunction
* The object containing the sink's invoke function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction, int paralelism) {
return environment.addSink(this.copy(), sinkFunction, paralelism);
}
/**
* Sets the given sink function.
* @param sinkFunction
* The object containing the sink's invoke function.
* @return
* The modified datastream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return environment.addSink(this.copy(), sinkFunction);
}
/**
* Prints the datastream.
* @return
* The original stream.
*/
public DataStream<T> print() {
return environment.print(this.copy());
}
/**
* Set the type parameter.
* @param type
* The type parameter.
*/
protected void setType(TypeInformation<T> type) {
this.type = type;
}
/**
* Get the type information.
* @return
* The type of the generic parameter.
*/
public TypeInformation<T> getType() {
return this.type;
}
......
......@@ -22,11 +22,9 @@ import java.io.ObjectOutputStream;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.util.Collector;
//TODO:add link to ExecutionEnvironment
/**
......@@ -34,8 +32,6 @@ import eu.stratosphere.util.Collector;
* construct streaming topologies.
*
*/
// TODO: add file, elements, rmq source
// TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
......@@ -73,17 +69,6 @@ public class StreamExecutionEnvironment {
return this;
}
private static class DummySource extends UserSourceInvokable<Tuple1<String>> {
private static final long serialVersionUID = 1L;
public void invoke(Collector<Tuple1<String>> collector) {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple1<String>("source"));
}
}
}
/**
* Partitioning strategy on the stream.
*/
......@@ -92,10 +77,10 @@ public class StreamExecutionEnvironment {
}
/**
* Sets the batch size of the datastream in which the tuple are transmitted.
* Sets the batch size of the data stream in which the tuple are transmitted.
*
* @param inputStream
* input datastream
* input data stream
*/
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
......@@ -110,7 +95,7 @@ public class StreamExecutionEnvironment {
* Internal function for assembling the underlying JobGraph of the job.
*
* @param inputStream
* input datastream
* input data stream
* @param outputID
* ID of the output
*/
......@@ -138,7 +123,7 @@ public class StreamExecutionEnvironment {
}
// TODO: link to JobGraph, JobVertex, user-defined spellcheck
// TODO: link to JobGraph, JobVertex
/**
* Internal function for passing the user defined functions to the JobGraph
* of the job.
......@@ -177,6 +162,17 @@ public class StreamExecutionEnvironment {
return returnStream;
}
/**
* Ads a sink to the data stream closing it.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
......@@ -199,12 +195,32 @@ public class StreamExecutionEnvironment {
return returnStream;
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
}
public static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
// TODO: link to SinkFunction
/**
* Dummy implementation of the SinkFunction writing every tuple to the
* standard output.
*
* @param <IN>
* Input tuple type
*/
private static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
......@@ -214,6 +230,13 @@ public class StreamExecutionEnvironment {
}
/**
* Prints the tuples of the data stream to the standard output.
*
* @param inputStream
* the input data stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
......@@ -222,10 +245,24 @@ public class StreamExecutionEnvironment {
return returnStream;
}
// TODO: Link to JobGraph and ClusterUtil
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*/
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
*
* @param sourceFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
......@@ -246,6 +283,7 @@ public class StreamExecutionEnvironment {
return returnStream.copy();
}
//TODO: understand difference
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path), 1);
}
......@@ -254,24 +292,11 @@ public class StreamExecutionEnvironment {
return addSource(new FileStreamFunction(path), 1);
}
public DataStream<Tuple1<String>> addDummySource() {
DataStream<Tuple1<String>> returnStream = new DataStream<Tuple1<String>>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(new DummySource());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setSource(returnStream.getId(), new DummySource(), "source",
baos.toByteArray(), 1, 1);
return returnStream;
}
//TODO: Add link to JobGraphBuilder
/**
* Getter of the JobGraphBuilder of the streaming job.
* @return
*/
public JobGraphBuilder jobGB() {
return jobGraphBuilder;
}
......
......@@ -29,10 +29,10 @@ public class BasicTopology {
Tuple1<String> tuple = new Tuple1<String>("streaming");
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// emit continuously a tuple
public void invoke(Collector<Tuple1<String>> out) throws Exception {
// continuously emit a tuple
while (true) {
collector.collect(tuple);
out.collect(tuple);
}
}
}
......
......@@ -30,8 +30,8 @@ public class CellInfoLocal {
private static Random rand = new Random();
private final static int CELL_COUNT = 10;
private final static int LAST_MILLIS = 1000;
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private final static int PARALELISM = 1;
private final static int SOURCE_PARALELISM = 1;
private final static class QuerySource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
......@@ -52,7 +52,7 @@ public class CellInfoLocal {
}
}
private final static class InfoSource extends
public final static class InfoSource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
private static final long serialVersionUID = 1L;
......@@ -60,7 +60,7 @@ public class CellInfoLocal {
false, 0, 0L, 0);
@Override
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> collector)
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> out)
throws Exception {
for (int i = 0; i < 1000; i++) {
Thread.sleep(100);
......@@ -68,7 +68,7 @@ public class CellInfoLocal {
tuple.f1 = rand.nextInt(CELL_COUNT);
tuple.f2 = System.currentTimeMillis();
collector.collect(tuple);
out.collect(tuple);
}
}
}
......@@ -85,6 +85,7 @@ public class CellInfoLocal {
Tuple1<String> outTuple = new Tuple1<String>();
// write information to String tuple based on the input tuple
@Override
public void flatMap(Tuple4<Boolean, Integer, Long, Integer> value,
Collector<Tuple1<String>> out) throws Exception {
......@@ -107,6 +108,7 @@ public class CellInfoLocal {
}
}
//In this example two different source then connect the two stream and apply a function for the connected stream
// TODO add arguments
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
......@@ -114,8 +116,12 @@ public class CellInfoLocal {
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
new QuerySource(), SOURCE_PARALELISM);
DataStream<Tuple1<String>> stream = env.addSource(new InfoSource(), SOURCE_PARALELISM)
.connectWith(querySource).partitionBy(1).flatMap(new CellTask(), PARALELISM);
DataStream<Tuple1<String>> stream = env
.addSource(new InfoSource(), SOURCE_PARALELISM)
.connectWith(querySource)
.partitionBy(1)
.flatMap(new CellTask(), PARALELISM);
stream.print();
env.execute();
......
......@@ -28,17 +28,17 @@ public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
collector.collect(outRecord);
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
out.collect(outTuple);
}
}
}
......@@ -28,17 +28,17 @@ public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
collector.collect(outRecord);
out.collect(outTuple);
}
}
}
......@@ -40,8 +40,9 @@ public class JoinTask extends
String streamId = value.f0;
String name = value.f1;
// From the input value that only contains the grade or the salary of a
// person generates a tuple that contains both the name and the salary
// Joins the input value with the already known values. If it is a grade
// then with the salaries, if it is a salary then with the grades. Also
// stores the new element.
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
for (Integer salary : salaryHashmap.get(name)) {
......
......@@ -59,10 +59,10 @@ public class IncrementalOLS {
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Double, Double[]>> collector) throws Exception {
public void invoke(Collector<Tuple2<Double, Double[]>> out) throws Exception {
while (true) {
collector.collect(getTrainingData());
out.collect(getTrainingData());
}
}
......
......@@ -38,17 +38,18 @@ public class WindowJoinLocal {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> source1 = env.addSource(
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM);
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = env
DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALELISM)
.connectWith(source1)
.connectWith(dataStream1)
.partitionBy(1)
.flatMap(new WindowJoinTask(), PARALELISM)
.addSink(new JoinSink());
dataStream2.print();
env.execute();
}
......
......@@ -28,17 +28,18 @@ public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
outRecord.f3 = progress;
collector.collect(outRecord);
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
outTuple.f3 = progress;
out.collect(outTuple);
progress += 1;
}
}
......
......@@ -28,17 +28,18 @@ public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outRecord.f3 = progress;
collector.collect(outRecord);
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
outTuple.f3 = progress;
out.collect(outTuple);
progress += 1;
}
}
......
......@@ -63,6 +63,10 @@ public class WindowJoinTask extends
String streamId = value.f0;
String name = value.f1;
Long progress = value.f3;
// Joins the input value with the already known values on a given interval. If it is a grade
// then with the salaries, if it is a salary then with the grades. Also
// stores the new element.
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
......
......@@ -24,10 +24,11 @@ public class WindowSumLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM)
.map(new WindowSumMultiple(), PARALELISM)
......
......@@ -24,6 +24,8 @@ public class WindowWordCountLocal {
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
// This example will count the occurrence of each word in the input file with a sliding window.
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
......
......@@ -29,6 +29,7 @@ public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>>
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private Long timestamp = 0L;
// Reads the lines of the input file and adds a timestamp to it.
@Override
public void invoke(Collector<Tuple2<String, Long>> collector) throws Exception {
BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
......
......@@ -26,6 +26,7 @@ public class WindowWordCountSplitter extends FlatMapFunction<Tuple2<String, Long
private Long timestamp = 0L;
private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
// Splits the lines according to the spaces. And adds the line's timestamp to them.
@Override
public void flatMap(Tuple2<String, Long> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
......
......@@ -31,6 +31,7 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
// Increments the counter of the occurrence of the input word
@Override
public Tuple2<String, Integer> map(Tuple1<String> inTuple) throws Exception {
word = inTuple.f0;
......@@ -47,8 +48,6 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
outTuple.f1 = count;
return outTuple;
// performanceCounter.count();
}
}
......@@ -22,6 +22,8 @@ import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountLocal {
// This example will count the occurrence of each word in the input file.
public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt");
......
......@@ -24,30 +24,13 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
// PerformanceCounter pCounter = new
// PerformanceCounter("SplitterEmitCounter", 1000, 1000,
// "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
// PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000,
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
// Splits the lines according on spaces
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
// pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
\ No newline at end of file
......@@ -19,7 +19,6 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
......
......@@ -21,7 +21,9 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
......@@ -83,8 +85,8 @@ public class FlatMapTest {
}
private static final int PARALELISM = 1;
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
@Test
public void test() throws Exception {
......
......@@ -18,7 +18,9 @@ package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
......@@ -145,8 +147,8 @@ public class MapTest {
}
}
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
private static int broadcastResult = 0;
private static int shuffleResult = 0;
private static int fieldsResult = 0;
......
......@@ -51,6 +51,8 @@ public class PrintTest {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, String>> source = env.addSource(new MySource(), 1);
DataStream<Tuple2<Integer, String>> map = source.flatMap(new MyFlatMap(), 1).print();
DataStream<Tuple2<Integer, String>> map2 = source.flatMap(new MyFlatMap(), 1).print();
source.print();
env.execute();
......
......@@ -22,9 +22,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
......@@ -35,20 +32,12 @@ public class TestDataUtilTest {
// String fileToDownload = "hamlet.txt";
// String expectedFile = "hamletTestExpectation.txt";
//
// deleteFile(TestDataUtil.testDataDir + fileToDownload);
//
// TestDataUtil.download(fileToDownload);
//
// assertTrue(compareFile(TestDataUtil.testDataDir + expectedFile, TestDataUtil.testDataDir
// + fileToDownload));
// }
public void deleteFile(String fileLocation) throws IOException{
Path path = Paths.get(fileLocation);
if(Files.exists(path))
Files.delete(path);
}
public boolean compareFile(String file1, String file2) throws FileNotFoundException,
IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册