提交 ff431e5a 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] BatchForward Refactor

上级 b933fcbf
......@@ -19,7 +19,7 @@ import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.StringValue;
public class MyBatchStreamSink implements UserSinkInvokable {
public class BachForwardSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
......
......@@ -19,13 +19,13 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
public class MyBatchStream extends TestBase2{
public class BatchForward extends TestBase2{
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", MyBatchStreamSource.class);
graphBuilder.setSink("StreamSink", MyBatchStreamSink.class);
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BachForwardSink.class);
graphBuilder.broadcastConnect("StreamSource", "StreamSink");
......
......@@ -15,22 +15,17 @@
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
public class MyBatchStreamSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final AtomRecord record=new AtomRecord();
private final StreamRecord mottoRecord=new StreamRecord();
public class BatchForwardSource extends UserSourceInvokable {
private final StringValue motto = new StringValue("Stratosphere Big Data looks tiny from here");
private final StreamRecord mottoRecord = new StreamRecord(motto);
@Override
public void invoke() throws Exception {
record.setField(0, new StringValue(motto));
mottoRecord.addRecord(record);
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10; i++) {
emit(mottoRecord);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册