提交 a5e1f0ae 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] set/get methods written for some basic types

上级 ebb18f21
......@@ -28,28 +28,6 @@ import java.util.List;
import java.util.UUID;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple10;
import eu.stratosphere.api.java.tuple.Tuple11;
import eu.stratosphere.api.java.tuple.Tuple12;
import eu.stratosphere.api.java.tuple.Tuple13;
import eu.stratosphere.api.java.tuple.Tuple14;
import eu.stratosphere.api.java.tuple.Tuple15;
import eu.stratosphere.api.java.tuple.Tuple16;
import eu.stratosphere.api.java.tuple.Tuple17;
import eu.stratosphere.api.java.tuple.Tuple18;
import eu.stratosphere.api.java.tuple.Tuple19;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple20;
import eu.stratosphere.api.java.tuple.Tuple21;
import eu.stratosphere.api.java.tuple.Tuple22;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple6;
import eu.stratosphere.api.java.tuple.Tuple7;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
......@@ -58,7 +36,6 @@ import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
/**
* Object for storing serializable records in batch (single records are
......@@ -73,13 +50,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private StringValue uid = new StringValue("");
private int numOfFields;
private int numOfRecords;
private Class<? extends Tuple> clazz = null;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
/**
......@@ -185,6 +155,87 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public String getString(int fieldNumber) {
try {
return (String) recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Integer getInteger(int fieldNumber) {
try {
return (Integer) recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Long getLong(int fieldNumber) {
try {
return (Long) recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Boolean getBoolean(int fieldNumber) {
try {
return (Boolean) recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Double getDouble(int fieldNumber) {
try {
return (Double) recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public String getString(int recordNumber, int fieldNumber) {
try {
return (String) recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Integer getInteger(int recordNumber, int fieldNumber) {
try {
return (Integer) recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Long getLong(int recordNumber, int fieldNumber) {
try {
return (Long) recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Boolean getBoolean(int recordNumber, int fieldNumber) {
try {
return (Boolean) recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
public Double getDouble(int recordNumber, int fieldNumber) {
try {
return (Double) recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
/**
* Sets a field in the given position of a specific record in the batch
......@@ -203,6 +254,87 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setString(int recordNumber, int fieldNumber, String o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setInteger(int recordNumber, int fieldNumber, Integer o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setLong(int recordNumber, int fieldNumber, Long o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setDouble(int recordNumber, int fieldNumber, Double o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setBoolean(int recordNumber, int fieldNumber, Boolean o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setString(int fieldNumber, String o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setInteger(int fieldNumber, Integer o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setLong(int fieldNumber, Long o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setDouble(int fieldNumber, Double o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
public void setBoolean(int fieldNumber, Boolean o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
/**
* Sets a field in the given position of the first record in the batch
......@@ -239,6 +371,23 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public Tuple getRecord() {
return getRecord(0);
}
public void getTupleInto(Tuple tuple){
if (tuple.getArity() == numOfFields) {
try {
Tuple source = recordBatch.get(0);
for(int i=0;i<numOfFields;i++){
tuple.setField(source.getField(i), i);
}
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
} else {
throw (new RecordSizeMismatchException());
}
}
/**
* Sets a record at the given position in the batch
......
......@@ -30,27 +30,47 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
public class StreamRecordTest {
@Test
public void singleRecordSetGetTest() {
StreamRecord record = new StreamRecord(new Tuple2<String, Integer>("Stratosphere",1));
StreamRecord record = new StreamRecord(new Tuple2<String, Integer>("Stratosphere", 1));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals("Stratosphere",record.getField(0));
assertEquals(1, record.getField(1));
assertEquals("Stratosphere", record.getString(0));
assertEquals((Integer) 1, record.getInteger(1));
record.setField(1, "Big Data");
assertEquals("Big Data", record.getField(1));
assertEquals("Big Data", record.getString(1));
record.setRecord(new Tuple2<String, Integer>("Big Data looks tiny from here.",2));
record.setRecord(new Tuple2<String, Long>("Big Data looks tiny from here.", 2L));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(2, record.getField(1));
assertEquals((Long) 2L, record.getLong(1));
record.setRecord(new Tuple2<String, Boolean>("Big Data looks tiny from here.", true));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(true, record.getBoolean(1));
record.setRecord(new Tuple2<String, Double>("Big Data looks tiny from here.", 2.5));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals((Double) 2.5, record.getDouble(1));
Tuple2<String, Double> tuple = new Tuple2<String, Double>();
record.getTupleInto(tuple);
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
record.setDouble(1,3.3);
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
}
@Test
......@@ -65,9 +85,9 @@ public class StreamRecordTest {
assertEquals(2, record.getNumOfFields());
assertEquals(2, record.getNumOfRecords());
assertEquals(1, record.getField(0, 0));
assertEquals(2, record.getField(1, 1));
assertEquals((Integer) 1, record.getInteger(0, 0));
assertEquals((Integer) 2, record.getInteger(1, 1));
record.setRecord(1, new Tuple2<Integer, Integer>(-1, -3));
assertEquals(-1, record.getField(1, 0));
......@@ -99,14 +119,14 @@ public class StreamRecordTest {
}
try {
a.setRecord(new Tuple2<String,String>("Data","Stratosphere"));
a.setRecord(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
}
StreamRecord b = new StreamRecord();
try {
b.addRecord(new Tuple2<String,String>("Data","Stratosphere"));
b.addRecord(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
}
......@@ -125,17 +145,15 @@ public class StreamRecordTest {
int num = 42;
String str = "above clouds";
StreamRecord rec = new StreamRecord(
new Tuple2<Integer, String>(num, str));
StreamRecord rec = new StreamRecord(new Tuple2<Integer, String>(num, str));
try {
rec.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buff.toByteArray()));
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord newRec = new StreamRecord();
newRec.read(in);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getRecord(0);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getRecord(0);
assertEquals(tupleOut.getField(0), 42);
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册