提交 ce7a8996 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] addSink added to newapi

上级 ce56b146
...@@ -78,12 +78,16 @@ public class DataStream<T extends Tuple> { ...@@ -78,12 +78,16 @@ public class DataStream<T extends Tuple> {
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) { public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return context.addFlatMapFunction(this, flatMapper); return context.addFlatMapFunction(this, flatMapper);
} }
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return context.addSink(this, sinkFunction);
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) { public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return context.addMapFunction(this, mapper); return context.addMapFunction(this, mapper);
} }
public <R extends Tuple> DataStream<R> addDummySink() { public DataStream<T> addDummySink() {
return context.addDummySink(this); return context.addDummySink(this);
} }
......
package eu.stratosphere.api.datastream;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN tuple);
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.api.datastream;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
public SinkInvokable(SinkFunction<IN> sinkFunction) {
this.sinkFunction = sinkFunction;
}
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
int batchSize = record.getBatchSize();
for (int i = 0; i < batchSize; i++) {
@SuppressWarnings("unchecked")
IN tuple = (IN) record.getTuple(i);
sinkFunction.invoke(tuple);
}
}
}
...@@ -101,6 +101,28 @@ public class StreamExecutionEnvironment { ...@@ -101,6 +101,28 @@ public class StreamExecutionEnvironment {
return returnStream; return returnStream;
} }
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
DataStream<T> returnStream = new DataStream<T>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(sinkFunction);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setSink("sink", new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray());
connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, inputStream.cparam);
return returnStream;
}
public <T extends Tuple, R extends Tuple> DataStream<R> addMapFunction( public <T extends Tuple, R extends Tuple> DataStream<R> addMapFunction(
DataStream<T> inputStream, final MapFunction<T, R> mapper) { DataStream<T> inputStream, final MapFunction<T, R> mapper) {
DataStream<R> returnStream = new DataStream<R>(this); DataStream<R> returnStream = new DataStream<R>(this);
...@@ -120,39 +142,23 @@ public class StreamExecutionEnvironment { ...@@ -120,39 +142,23 @@ public class StreamExecutionEnvironment {
connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype, connectGraph(inputStream.connectIDs, returnStream.getId(), inputStream.ctype,
inputStream.cparam); inputStream.cparam);
return returnStream; return returnStream;
} }
public static final class DummySink extends UserSinkInvokable<Tuple1<String>> { public static final class DummySink extends SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception { public void invoke(Tuple1<String> tuple) {
for (Tuple tuple : record.getBatchIterable()) { System.out.println(tuple);
System.out.println(tuple);
}
} }
} }
public <T extends Tuple, R extends Tuple> DataStream<R> addDummySink(DataStream<T> inputStream) { public <T extends Tuple> DataStream<T> addDummySink(DataStream<T> inputStream) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(new DummySink());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setSink("sink", new DummySink(), "sink", baos.toByteArray());
connectGraph(inputStream.connectIDs, "sink", inputStream.ctype, return addSink(inputStream, (SinkFunction<T>) new DummySink());
inputStream.cparam);
return new DataStream<R>(this);
} }
public void execute() { public void execute() {
......
...@@ -22,5 +22,6 @@ import eu.stratosphere.api.java.tuple.Tuple; ...@@ -22,5 +22,6 @@ import eu.stratosphere.api.java.tuple.Tuple;
public abstract class UserSinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, Tuple> public abstract class UserSinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, Tuple>
implements Serializable { implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
} }
\ No newline at end of file
...@@ -24,6 +24,7 @@ import java.util.List; ...@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
...@@ -148,10 +149,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -148,10 +149,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else if (operatorName.equals("sink")) { } else if (operatorName.equals("sink")) {
UserSinkInvokable<Tuple> f = (UserSinkInvokable<Tuple>) in.readObject(); SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo( inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSinkInvokable.class, f.getClass(), 0, null, null); SinkFunction.class, f.getClass(), 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer(); inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer); inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
......
...@@ -8,6 +8,7 @@ import java.io.ObjectInputStream; ...@@ -8,6 +8,7 @@ import java.io.ObjectInputStream;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream; import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
...@@ -34,6 +35,16 @@ public class FlatMapTest { ...@@ -34,6 +35,16 @@ public class FlatMapTest {
} }
} }
public static final class MySink extends SinkFunction<Tuple1<String>> {
@Override
public void invoke(Tuple1<String> tuple) {
// TODO Auto-generated method stub
System.out.println(tuple);
}
}
@Test @Test
public void test() throws Exception { public void test() throws Exception {
Tuple1<String> tup = new Tuple1<String>("asd"); Tuple1<String> tup = new Tuple1<String>("asd");
...@@ -42,9 +53,9 @@ public class FlatMapTest { ...@@ -42,9 +53,9 @@ public class FlatMapTest {
DataStream<Tuple1<String>> dataStream0 = context.setDummySource(); DataStream<Tuple1<String>> dataStream0 = context.setDummySource();
DataStream<Tuple1<String>> dataStream1 = context.setDummySource().connectWith(dataStream0) DataStream<Tuple1<String>> dataStream1 = context.setDummySource().connectWith(dataStream0)
.partitionBy(0).flatMap(new MyFlatMap()).broadcast().addDummySink(); .partitionBy(0).flatMap(new MyFlatMap()).broadcast().addSink(new MySink());
context.execute(); context.execute();
JobGraphBuilder jgb = context.jobGB(); JobGraphBuilder jgb = context.jobGB();
...@@ -84,11 +95,11 @@ public class FlatMapTest { ...@@ -84,11 +95,11 @@ public class FlatMapTest {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
UserSinkInvokable<Tuple> f = (UserSinkInvokable<Tuple>) in.readObject(); SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass()); System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSinkInvokable.class, f.getClass(), 0, null, null); SinkFunction.class, f.getClass(), 0, null, null);
System.out.println(ts); System.out.println(ts);
......
...@@ -8,6 +8,7 @@ import java.io.ObjectInputStream; ...@@ -8,6 +8,7 @@ import java.io.ObjectInputStream;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.datastream.DataStream; import eu.stratosphere.api.datastream.DataStream;
import eu.stratosphere.api.datastream.SinkFunction;
import eu.stratosphere.api.datastream.StreamExecutionEnvironment; import eu.stratosphere.api.datastream.StreamExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
...@@ -22,6 +23,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex; ...@@ -22,6 +23,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent; import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
...@@ -71,7 +73,7 @@ public class MapTest { ...@@ -71,7 +73,7 @@ public class MapTest {
byte[] userFunctionSerialized = config.getBytes("serializedudf", null); byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized)); in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
StreamInvokableComponent userFunction = (StreamInvokableComponent) in.readObject(); UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
System.out.println(userFunction.getClass()); System.out.println(userFunction.getClass());
assertTrue(true); assertTrue(true);
System.out.println("----------------"); System.out.println("----------------");
...@@ -84,11 +86,11 @@ public class MapTest { ...@@ -84,11 +86,11 @@ public class MapTest {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
UserSinkInvokable<Tuple> f = (UserSinkInvokable<Tuple>) in.readObject(); SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass()); System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSinkInvokable.class, f.getClass(), 0, null, null); SinkFunction.class, f.getClass(), 0, null, null);
System.out.println(ts); System.out.println(ts);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册