提交 9a7b6d52 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] examples update

上级 8a2ad26f
......@@ -160,8 +160,8 @@ public class DataStream<T extends Tuple> {
return context.addSink(this.copy(), sinkFunction);
}
public DataStream<T> addDummySink() {
return context.addDummySink(this.copy());
public DataStream<T> print() {
return context.print(this.copy());
}
protected void setType(TypeInformation<T> type) {
......
......@@ -460,10 +460,10 @@ public class JobGraphBuilder {
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger(
"numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), numberOfInstances.get(downStreamComponentName));
config.setInteger("numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1),
numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
......@@ -566,4 +566,13 @@ public class JobGraphBuilder {
setNumberOfJobOutputs();
return jobGraph;
}
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
}
......@@ -131,14 +131,21 @@ public class StreamExecutionEnvironment {
}
//TODO: link to JobGraph, JobVertex, user-defined spellcheck
// TODO: link to JobGraph, JobVertex, user-defined spellcheck
/**
* Internal function for passing the user defined functions to the JobGraph of the job.
* @param functionName name of the function
* @param inputStream input data stream
* @param function the user defined function
* @param functionInvokable the wrapping JobVertex instance
* @param parallelism number of parallel instances of the function
* Internal function for passing the user defined functions to the JobGraph
* of the job.
*
* @param functionName
* name of the function
* @param inputStream
* input data stream
* @param function
* the user defined function
* @param functionInvokable
* the wrapping JobVertex instance
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
......@@ -177,10 +184,10 @@ public class StreamExecutionEnvironment {
e.printStackTrace();
}
jobGraphBuilder.setSink("sink", new SinkInvokable<T>(sinkFunction), "sink",
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray(), parallelism, parallelism);
connectGraph(inputStream, "sink");
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
......@@ -190,19 +197,22 @@ public class StreamExecutionEnvironment {
return addSink(inputStream, sinkFunction, 1);
}
public static final class DummySink extends SinkFunction<Tuple1<String>> {
public static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> tuple) {
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
public <T extends Tuple> DataStream<T> addDummySink(DataStream<T> inputStream) {
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
return addSink(inputStream, (SinkFunction<T>) new DummySink());
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
public void execute() {
......@@ -226,7 +236,7 @@ public class StreamExecutionEnvironment {
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
baos.toByteArray(), parallelism, parallelism);
return returnStream;
return returnStream.copy();
}
public DataStream<Tuple1<String>> readTextFile(String path) {
......
......@@ -121,8 +121,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollector2<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
......@@ -368,6 +368,15 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
}
}
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
......
......@@ -46,6 +46,7 @@ public class StreamSink extends AbstractOutputTask {
try {
streamSinkHelper.setSerializers(taskConfiguration);
streamSinkHelper.setSinkSerializer();
inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
} catch (Exception e) {
if (log.isErrorEnabled()) {
......
......@@ -26,7 +26,7 @@ public class BasicTopology {
public static class BasicSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
Tuple1<String> tuple = new Tuple1<String>("streaming");
Tuple1<String> tuple = new Tuple1<String>("streaming");
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
......@@ -53,9 +53,12 @@ public class BasicTopology {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM)
.map(new BasicMap(), PARALELISM);
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM).map(new BasicMap(), PARALELISM).addDummySink();
stream.print();
env.execute();
}
}
......@@ -94,7 +94,8 @@ public class CellInfoLocal {
// QUERY
if (value.f0) {
lastMillis = value.f3;
outTuple.f0 = "QUERY:\t"+cellID+ ": " + engine.get(timeStamp, lastMillis, cellID);
outTuple.f0 = "QUERY:\t" + cellID + ": "
+ engine.get(timeStamp, lastMillis, cellID);
out.collect(outTuple);
}
// INFO
......@@ -110,11 +111,12 @@ public class CellInfoLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env
.addSource(new QuerySource(), SOURCE_PARALELISM);
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
new QuerySource(), SOURCE_PARALELISM);
DataStream<Tuple1<String>> stream = env.addSource(new InfoSource(), SOURCE_PARALELISM)
.connectWith(querySource).partitionBy(1).flatMap(new CellTask(), PARALELISM).addDummySink();
.connectWith(querySource).partitionBy(1).flatMap(new CellTask(), PARALELISM);
stream.print();
env.execute();
}
......
......@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
......@@ -120,15 +119,6 @@ public class IncrementalLearningSkeleton {
}
public static class IMLSink extends SinkFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Integer> inTuple) {
// do nothing
}
}
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
......@@ -144,8 +134,9 @@ public class IncrementalLearningSkeleton {
DataStream<Tuple1<Integer>> prediction =
env.addSource(new NewDataSource(), SOURCE_PARALELISM)
.connectWith(model)
.map(new Predictor(), PARALELISM)
.addSink(new IMLSink());
.map(new Predictor(), PARALELISM);
prediction.print();
env.execute();
}
......
......@@ -24,7 +24,6 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
......@@ -150,15 +149,6 @@ public class IncrementalOLS {
}
public static class IncOLSSink extends SinkFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> inTuple) {
System.out.println(inTuple);
}
}
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
......@@ -174,9 +164,10 @@ public class IncrementalOLS {
DataStream<Tuple1<Double>> prediction =
env.addSource(new NewDataSource(), SOURCE_PARALELISM)
.connectWith(model)
.map(new Predictor(), PARALELISM)
.addSink(new IncOLSSink());
.map(new Predictor(), PARALELISM);
prediction.print();
env.execute();
}
}
......@@ -23,18 +23,18 @@ import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountLocal {
public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextFile("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountSplitter(), 1)
.partitionBy(0)
.map(new WordCountCounter(), 1)
.addSink(new WordCountSink());
.map(new WordCountCounter(), 1);
dataStream.print();
env.execute();
}
}
......@@ -27,7 +27,7 @@ public class KafkaTopology {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:7077", "group", "topic", 1), SOURCE_PARALELISM)
.addDummySink();
.print();
context.execute();
}
......
......@@ -19,24 +19,22 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.examples.wordcount.WordCountCounter;
import eu.stratosphere.streaming.examples.wordcount.WordCountSink;
import eu.stratosphere.streaming.examples.wordcount.WordCountSplitter;
import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountPerformanceLocal {
public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountPerformanceSplitter(), 1)
.broadcast()
.map(new WordCountCounter(), 3)
.addSink(new WordCountSink(), 2);
.map(new WordCountCounter(), 3);
dataStream.print();
env.execute();
}
}
......@@ -20,15 +20,15 @@ import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class RMQTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"), SOURCE_PARALELISM)
.addDummySink();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"),
SOURCE_PARALELISM).print();
context.execute();
}
}
\ No newline at end of file
......@@ -13,16 +13,45 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
package eu.stratosphere.streaming.api;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.util.Collector;
public class PrintTest {
public static final class MyFlatMap extends
FlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
throws Exception {
out.collect(new Tuple2<Integer, String>(value.f0 * value.f0, value.f1));
}
}
public static final class MySource extends SourceFunction<Tuple2<Integer, String>> {
@Override
public void invoke(Collector<Tuple2<Integer, String>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple2<Integer, String>(i, "test"));
}
}
}
@Test
public void test() throws Exception {
public class WordCountSink extends SinkFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, String>> source = env.addSource(new MySource(), 1);
DataStream<Tuple2<Integer, String>> map = source.flatMap(new MyFlatMap(), 1).print();
env.execute();
@Override
public void invoke(Tuple2<String, Integer> inTuple) {
//System.out.println(inTuple);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册