提交 5f6cdb60 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Stratosphere dependency upgraded to 0.5.1

上级 51dbe6e9
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.5</stratosphere.version>
<stratosphere.version>0.5.1</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -24,14 +24,12 @@ import org.apache.log4j.Level;
import org.junit.BeforeClass;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
......@@ -43,7 +41,7 @@ public class StreamComponentTest {
public static class MySource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new ArrayStreamRecord(new Tuple[] {new Tuple1<Integer>()});
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
String out;
public MySource() {
......@@ -56,7 +54,7 @@ public class StreamComponentTest {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100; i++) {
record.getTuple(0).setField(0, i);
record.setField(0, i);
emit(record);
}
}
......@@ -83,8 +81,8 @@ public class StreamComponentTest {
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = record.getTuple(0).getField(0);
emit(new ArrayStreamRecord(new Tuple[] {new Tuple2<Integer, Integer>(i, i + 1)}));
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(i, i + 1)));
}
@Override
......@@ -108,8 +106,8 @@ public class StreamComponentTest {
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = record.getTuple(0).getField(0);
emit(new ArrayStreamRecord(new Tuple[] {new Tuple2<Integer, Integer>(-i - 1, -i - 2)}));
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(-i - 1, -i - 2)));
}
@Override
......@@ -130,8 +128,8 @@ public class StreamComponentTest {
@Override
public void invoke(StreamRecord record) throws Exception {
Integer k = record.getTuple(0).getField(0);
Integer v = record.getTuple(0).getField(1);
Integer k = record.getInteger(0);
Integer v = record.getInteger(1);
data.put(k, v);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册