提交 3b5ec860 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] StreamRecord documentation update1

上级 e2a2a49f
......@@ -33,7 +33,7 @@ import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.RecordSizeMismatchException;
import eu.stratosphere.streaming.api.streamrecord.TupleSizeMismatchException;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
......@@ -201,7 +201,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
// TODO: write an exception class to throw forward
} catch (RecordSizeMismatchException e) {
} catch (TupleSizeMismatchException e) {
throw (e);
} catch (Exception e) {
e.printStackTrace();
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.api.streamrecord;
public class NoSuchRecordException extends StreamRecordException {
public class NoSuchTupleException extends StreamRecordException {
private static final long serialVersionUID = 4935457355434561574L;
......
......@@ -13,7 +13,7 @@
package eu.stratosphere.streaming.api.streamrecord;
public class RecordSizeMismatchException extends StreamRecordException {
public class TupleSizeMismatchException extends StreamRecordException {
/**
* Serial version UID for serialization interoperability.
......
......@@ -31,7 +31,7 @@ public class BatchWordCountSplitter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfRecords();
int numberOfRecords = record.getNumOfTuples();
for (int i = 0; i < numberOfRecords; ++i) {
words = record.getString(0).split(" ");
timestamp=record.getLong(1);
......
......@@ -38,7 +38,7 @@ public class StreamRecordTest {
StreamRecord record = new StreamRecord(new Tuple2<String, Integer>("Stratosphere", 1));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(1, record.getNumOfTuples());
assertEquals("Stratosphere", record.getString(0));
assertEquals((Integer) 1, record.getInteger(1));
......@@ -47,17 +47,17 @@ public class StreamRecordTest {
record.setRecord(new Tuple2<String, Long>("Big Data looks tiny from here.", 2L));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(1, record.getNumOfTuples());
assertEquals((Long) 2L, record.getLong(1));
record.setRecord(new Tuple2<String, Boolean>("Big Data looks tiny from here.", true));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(1, record.getNumOfTuples());
assertEquals(true, record.getBoolean(1));
record.setRecord(new Tuple2<String, Double>("Big Data looks tiny from here.", 2.5));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(1, record.getNumOfTuples());
assertEquals((Double) 2.5, record.getDouble(1));
Tuple2<String, Double> tuple = new Tuple2<String, Double>();
......@@ -80,11 +80,11 @@ public class StreamRecordTest {
try {
record.addRecord(new Tuple1<String>("4"));
fail();
} catch (RecordSizeMismatchException e) {
} catch (TupleSizeMismatchException e) {
}
assertEquals(2, record.getNumOfFields());
assertEquals(2, record.getNumOfRecords());
assertEquals(2, record.getNumOfTuples());
assertEquals((Integer) 1, record.getInteger(0, 0));
assertEquals((Integer) 2, record.getInteger(1, 1));
......@@ -92,7 +92,7 @@ public class StreamRecordTest {
assertEquals(-1, record.getField(1, 0));
assertEquals(2, record.getNumOfFields());
assertEquals(2, record.getNumOfRecords());
assertEquals(2, record.getNumOfTuples());
}
@Test
......@@ -115,20 +115,20 @@ public class StreamRecordTest {
try {
a.setRecord(4, new Tuple1<String>("Data"));
fail();
} catch (NoSuchRecordException e) {
} catch (NoSuchTupleException e) {
}
try {
a.setRecord(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
} catch (TupleSizeMismatchException e) {
}
StreamRecord b = new StreamRecord();
try {
b.addRecord(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
} catch (TupleSizeMismatchException e) {
}
try {
......@@ -153,7 +153,7 @@ public class StreamRecordTest {
StreamRecord newRec = new StreamRecord();
newRec.read(in);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getRecord(0);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getTuple(0);
assertEquals(tupleOut.getField(0), 42);
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册