提交 53e54a60 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] StreamRecord update with faster serialization and removed copy...

[streaming] StreamRecord update with faster serialization and removed copy when adding/setting tuples
上级 a68d78fa
......@@ -27,8 +27,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.util.ByteBufferInputStream;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple10;
......@@ -55,13 +53,11 @@ import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.BasicArrayTypeInfo;
import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.IntValue;
/**
* Object for storing serializable records in batch (single records are
......@@ -69,7 +65,6 @@ import eu.stratosphere.types.IntValue;
* objects in Stratosphere stream processing. The elements of the batch are
* Tuples.
*/
// TODO: update documentation
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
......@@ -79,9 +74,10 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private int numOfTuples;
private int batchSize;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class,
Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class,
Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
......@@ -91,6 +87,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
/**
* Creates empty StreamRecord with number of fields set
*
* @param numOfFields
* number of fields
*/
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
......@@ -98,6 +100,14 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Creates empty StreamRecord with number of fields and batch size set
*
* @param numOfFields
* Number of fields in the tuples
* @param batchSize
* Batch size
*/
public StreamRecord(int numOfFields, int batchSize) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
......@@ -168,7 +178,10 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return this;
}
public void InitRecords() {
/**
* Initializes the record batch elemnts to null
*/
public void initRecords() {
tupleBatch.clear();
for (int i = 0; i < batchSize; i++) {
tupleBatch.add(null);
......@@ -202,7 +215,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
Tuple tuple;
try {
tuple = tupleBatch.get(tupleNumber);
......@@ -216,21 +230,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
// public Object getFieldFast(int tupleNumber, int fieldNumber) throws
// NoSuchTupleException, NoSuchFieldException {
// Tuple tuple;
// try {
// tuple = tupleBatch.get(tupleNumber);
// } catch (IndexOutOfBoundsException e) {
// throw (new NoSuchTupleException());
// }
// try {
// return tuple.getFieldFast(fieldNumber);
// } catch (IndexOutOfBoundsException e) {
// throw (new NoSuchFieldException());
// }
// }
/**
* Get a Boolean from the given field of the first Tuple of the batch
*
......@@ -256,7 +255,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* , NoSuchFieldException
*/
// TODO: add exception for cast for all getters
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Boolean) getField(tupleNumber, fieldNumber);
}
......@@ -284,7 +284,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Byte) getField(tupleNumber, fieldNumber);
}
......@@ -297,7 +298,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Character getCharacter(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Character getCharacter(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return getCharacter(0, fieldNumber);
}
......@@ -312,7 +314,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Character) getField(tupleNumber, fieldNumber);
}
......@@ -340,7 +343,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Double) getField(tupleNumber, fieldNumber);
}
......@@ -368,7 +372,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Float) getField(tupleNumber, fieldNumber);
}
......@@ -396,7 +401,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Integer) getField(tupleNumber, fieldNumber);
}
......@@ -424,7 +430,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Long) getField(tupleNumber, fieldNumber);
}
......@@ -452,7 +459,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Short) getField(tupleNumber, fieldNumber);
}
......@@ -478,7 +486,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the field in the tuple
* @return value of the field as String
*/
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (String) getField(tupleNumber, fieldNumber);
}
......@@ -599,7 +608,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setCharacter(int tupleNumber, int fieldNumber, Character c) throws NoSuchFieldException {
public void setCharacter(int tupleNumber, int fieldNumber, Character c)
throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, c);
}
......@@ -822,7 +832,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
try {
......@@ -840,7 +851,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets the first tuple in the batch with a deep copy of the given tuple
* Sets the first tuple in the batch with the given tuple
*
* @param tuple
* Tuple to set
......@@ -851,8 +862,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets a tuple at the given position in the batch with a deep copy of the
* given tuple
* Sets a tuple at the given position in the batch with the given tuple
*
* @param tupleNumber
* Position of tuple in the batch
......@@ -861,10 +871,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
try {
tupleBatch.set(tupleNumber, copyTuple(tuple));
tupleBatch.set(tupleNumber, tuple);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
......@@ -875,7 +886,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* Checks if the number of fields are equal to the batch field size then
* adds the deep copy of Tuple to the end of the batch
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
......@@ -886,7 +897,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the deep copy of Tuple to the given position into the recordbatch
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
......@@ -895,7 +906,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, copyTuple(tuple));
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
......@@ -919,6 +930,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
*
* @return copy of the StreamRecord
* @throws IOException
*/
public StreamRecord copySerialized() throws IOException {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
......@@ -1016,7 +1034,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tuple
* @return byte array representing types
*/
byte[] tupleBasicTypesToByteArray(Tuple tuple) {
byte[] tupleTypesToByteArray(Tuple tuple) {
byte[] typeNums = new byte[numOfFields];
for (int i = 0; i < typeNums.length; i++) {
Class<? extends Object> type = tuple.getField(i).getClass();
......@@ -1066,10 +1084,10 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
* @param byte array representing types
* @param numberOfFields
* @return Class array of field types
* @return TypeInfo array of field types
*/
@SuppressWarnings("rawtypes")
TypeInformation[] tupleBasicTypesFromByteArray(byte[] representation, int numberOfFields) {
TypeInformation[] tupleTypesFromByteArray(byte[] representation) {
TypeInformation[] basicTypes = new TypeInformation[representation.length];
for (int i = 0; i < basicTypes.length; i++) {
switch (representation[i]) {
......@@ -1135,101 +1153,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return basicTypes;
}
static String typeStringFromByteArray(byte[] representation, int numberOfFields) {
StringBuilder typeInfo = new StringBuilder("Tuple");
typeInfo.append(numberOfFields + "<");
for (int i = 0; i < representation.length; i++) {
switch (representation[i]) {
case 0:
typeInfo.append("Boolean,");
break;
case 1:
typeInfo.append("Byte,");
break;
case 2:
typeInfo.append("Character,");
break;
case 3:
typeInfo.append("Double,");
break;
case 4:
typeInfo.append("Float,");
break;
case 5:
typeInfo.append("Integer,");
break;
case 6:
typeInfo.append("Long,");
break;
case 7:
typeInfo.append("Short,");
break;
case 8:
typeInfo.append("String,");
break;
case 9:
typeInfo.append("Boolean[],");
break;
case 10:
typeInfo.append("Byte[],");
break;
case 11:
typeInfo.append("Character[],");
break;
case 12:
typeInfo.append("Double[],");
break;
case 13:
typeInfo.append("Float[],");
break;
case 14:
typeInfo.append("Integer[],");
break;
case 15:
typeInfo.append("Long[],");
break;
case 16:
typeInfo.append("Short[],");
break;
case 17:
typeInfo.append("String[],");
break;
case 18:
typeInfo.append("boolean[],");
break;
case 19:
typeInfo.append("byte[],");
break;
case 20:
typeInfo.append("char[],");
break;
case 21:
typeInfo.append("double[],");
break;
case 22:
typeInfo.append("float[],");
break;
case 23:
typeInfo.append("int[],");
break;
case 24:
typeInfo.append("long[],");
break;
case 25:
typeInfo.append("short[],");
break;
default:
typeInfo.append("String,");
break;
}
}
typeInfo.deleteCharAt(typeInfo.length() - 1);
typeInfo.append(">");
return typeInfo.toString();
}
/**
* Writes tuple to the specified DataOutput
*
......@@ -1241,17 +1164,17 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
private void writeTuple(Tuple tuple, DataOutput out) throws IOException {
// TODO: exception for empty record - no getField
// TODO: better serialization logic
byte[] typeArray = tupleBasicTypesToByteArray(getTuple());
TypeInformation<? extends Tuple> typeInfo = TypeExtractor.getForObject(getTuple());
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
byte[] typesInByte = tupleTypesToByteArray(getTuple());
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(
tupleSerializer);
serializationDelegate.setInstance(tuple);
out.writeInt(numOfFields);
out.write(typeArray);
out.write(typesInByte);
serializationDelegate.write(out);
}
......@@ -1263,16 +1186,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return Tuple read
* @throws IOException
*/
private Tuple readTuple(DataInput in) throws IOException {
private Tuple readTuple(DataInput in, int numberOfFields) throws IOException {
int numberOfFields = in.readInt();
byte[] typesInByte = new byte[numberOfFields];
in.readFully(typesInByte, 0, numberOfFields);
// @SuppressWarnings("rawtypes")
TypeInformation<?>[] basicTypes = tupleBasicTypesFromByteArray(typesInByte, numberOfFields);
// TODO:skip this part somehow
//String typeString = typeStringFromByteArray(typesInByte, numberOfFields);
TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = typeInfo.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
......@@ -1287,13 +1207,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
// Write the number of fields with an IntValue
(new IntValue(numOfFields)).write(out);
uid.write(out);
// Write the number of records with an IntValue
(new IntValue(numOfTuples)).write(out);
out.writeByte(numOfFields);
out.writeInt(numOfTuples);
for (Tuple tuple : tupleBatch) {
writeTuple(tuple, out);
......@@ -1308,21 +1226,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
uid = new UID();
uid.read(in);
// Get the number of fields
IntValue numOfFieldsValue = new IntValue(0);
numOfFieldsValue.read(in);
numOfFields = numOfFieldsValue.getValue();
numOfFields = in.readByte();
numOfTuples = in.readInt();
// Get the number of records
IntValue numOfRecordsValue = new IntValue(0);
numOfRecordsValue.read(in);
numOfTuples = numOfRecordsValue.getValue();
// Make sure the fields have numOfFields elements
tupleBatch = new ArrayList<Tuple>();
tupleBatch = new ArrayList<Tuple>(numOfTuples);
for (int k = 0; k < numOfTuples; ++k) {
tupleBatch.add(readTuple(in));
tupleBatch.add(readTuple(in, numOfFields));
}
}
......
......@@ -34,7 +34,7 @@ public class BasicTopology {
public static class BasicSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String[]>(new String[] {"streaming", "flink"}));
StreamRecord record = new StreamRecord(new Tuple1<String>("streaming"));
@Override
public void invoke() throws Exception {
......@@ -64,6 +64,7 @@ public class BasicTopology {
@Override
public void invoke(StreamRecord record) throws Exception {
// do nothing
record.getField(0);
}
}
......
......@@ -63,7 +63,7 @@ public class IncrementalLearning {
@Override
public void invoke() throws Exception {
record.InitRecords();
record.initRecords();
while (true) {
for (int i = 0; i < BATCH_SIZE; i++) {
......
......@@ -26,9 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.junit.Ignore;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
......@@ -36,9 +34,9 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.TypeInformation;
public class StreamRecordTest {
......@@ -46,8 +44,8 @@ public class StreamRecordTest {
@Test
public void singleRecordSetGetTest() {
StreamRecord record = new StreamRecord(
new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>("Stratosphere", 1,
2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42));
new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>(
"Stratosphere", 1, 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42));
assertEquals(9, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
......@@ -125,7 +123,7 @@ public class StreamRecordTest {
Tuple5<String, Integer, Long, Boolean, Double> tuple = new Tuple5<String, Integer, Long, Boolean, Double>(
"Stratosphere", 1, 2L, true, 3.5);
record.addTuple(tuple);
record.addTuple(StreamRecord.copyTuple(tuple));
tuple.setField("", 0);
tuple.setField(0, 1);
......@@ -166,7 +164,8 @@ public class StreamRecordTest {
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, 2L, true, 3.5));
record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1,
2L, true, 3.5));
assertEquals("Stratosphere", record.getString(1, 0));
assertEquals((Integer) 1, record.getInteger(1, 1));
......@@ -184,7 +183,8 @@ public class StreamRecordTest {
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
record.addTuple(0, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, 2L, true, 3.5));
record.addTuple(0, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1,
2L, true, 3.5));
assertEquals(2, record.getNumOfTuples());
......@@ -227,58 +227,6 @@ public class StreamRecordTest {
}
// @Test
// public void getFieldSpeedTest() {
//
// final int ITERATION = 10000;
//
// StreamRecord record = new StreamRecord(new Tuple4<Integer, Long, String,
// String>(0, 42L, "Stratosphere",
// "Streaming"));
//
// long t = System.nanoTime();
// for (int i = 0; i < ITERATION; i++) {
// record.getField(0, i % 4);
// }
// long t2 = System.nanoTime() - t;
// System.out.println("Tuple5");
// System.out.println("getField:\t" + t2 + " ns");
//
// t = System.nanoTime();
// for (int i = 0; i < ITERATION; i++) {
// record.getFieldFast(0, i % 4);
// }
// t2 = System.nanoTime() - t;
// System.out.println("getFieldFast:\t" + t2 + " ns");
//
// StreamRecord record20 = new StreamRecord(
// new Tuple20<Integer, Long, String, String, String, String, String,
// String, String, String, String, String, String, String, String, String,
// String, String, String, String>(
// 0, 42L, "Stratosphere", "Streaming", "Stratosphere", "Stratosphere",
// "Streaming",
// "Stratosphere", "Streaming", "Streaming", "Stratosphere", "Streaming",
// "Stratosphere",
// "Streaming", "Streaming", "Stratosphere", "Streaming", "Stratosphere",
// "Streaming", "Streaming"));
//
// t = System.nanoTime();
// for (int i = 0; i < ITERATION; i++) {
// record20.getField(0, i % 20);
// }
// t2 = System.nanoTime() - t;
// System.out.println("Tuple20");
// System.out.println("getField:\t" + t2 + " ns");
//
// t = System.nanoTime();
// for (int i = 0; i < ITERATION; i++) {
// record20.getFieldFast(0, i % 20);
// }
// t2 = System.nanoTime() - t;
// System.out.println("getFieldFast:\t" + t2 + " ns");
//
// }
@Test
public void exceptionTest() {
StreamRecord a = new StreamRecord(new Tuple1<String>("Big"));
......@@ -316,7 +264,8 @@ public class StreamRecordTest {
int num = 42;
String str = "above clouds";
Integer[] intArray = new Integer[] { 1, 2 };
StreamRecord rec = new StreamRecord(new Tuple3<Integer, String, Integer[]>(num, str, intArray));
StreamRecord rec = new StreamRecord(new Tuple3<Integer, String, Integer[]>(num, str,
intArray));
try {
rec.write(out);
......@@ -325,7 +274,8 @@ public class StreamRecordTest {
StreamRecord newRec = new StreamRecord();
newRec.read(in);
@SuppressWarnings("unchecked")
Tuple3<Integer, String, Integer[]> tupleOut = (Tuple3<Integer, String, Integer[]>) newRec.getTuple(0);
Tuple3<Integer, String, Integer[]> tupleOut = (Tuple3<Integer, String, Integer[]>) newRec
.getTuple(0);
assertEquals(tupleOut.getField(0), 42);
assertEquals(str, tupleOut.getField(1));
......@@ -339,23 +289,24 @@ public class StreamRecordTest {
@Test
public void tupleCopyTest() {
Tuple3<String, Integer, Double[]> t1 = new Tuple3<String, Integer, Double[]>("a", 1, new Double[]{ 4.2 });
Tuple3<String, Integer, Double[]> t1 = new Tuple3<String, Integer, Double[]>("a", 1,
new Double[] { 4.2 });
@SuppressWarnings("rawtypes")
Tuple3 t2 = (Tuple3) StreamRecord.copyTuple(t1);
assertEquals("a", t2.getField(0));
assertEquals(1, t2.getField(1));
assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2));
assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
t1.setField(2, 1);
assertEquals(1, t2.getField(1));
assertEquals(2, t1.getField(1));
t1.setField(new Double[] {3.14}, 2);
assertArrayEquals(new Double[] {3.14}, (Double[]) t1.getField(2));
assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2));
t1.setField(new Double[] { 3.14 }, 2);
assertArrayEquals(new Double[] { 3.14 }, (Double[]) t1.getField(2));
assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
}
......@@ -363,12 +314,14 @@ public class StreamRecordTest {
@Test
public void tupleArraySerializationTest() throws IOException {
Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]> t1 = new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 },
new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l }, new Short[] { 12345 },
new String[] { "something" });
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
new Long[] { 12345678900l }, new Short[] { 12345 }, new String[] { "something" });
StreamRecord s1 = new StreamRecord(t1);
StreamRecord s2 = s1.copySerialized();
@SuppressWarnings("rawtypes")
Tuple9 t2 = (Tuple9) s2.getTuple();
assertArrayEquals(new Boolean[] { true }, (Boolean[]) t2.getField(0));
......@@ -396,49 +349,32 @@ public class StreamRecordTest {
@Test
public void typeCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord(
new Tuple9<Boolean, Byte, Character, Double, Float, Integer, Long, Short, String>((Boolean) true,
(Byte) (byte) 12, (Character) 'a', (Double) 12.5, (Float) (float) 13.5, (Integer) 1234,
(Long) 12345678900l, (Short) (short) 12345, "something"));
@SuppressWarnings({ "rawtypes", "unused" })
// Class[] types = new Class[9];
// assertArrayEquals(new TypeInformation[] { STRING_TYPE_INFO, BYTE_TYPE_INFO, Character.class, Double.class, Float.class,
// Integer.class, Long.class, Short.class, String.class },
// rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9));
new Tuple9<Boolean, Byte, Character, Double, Float, Integer, Long, Short, String>(
(Boolean) true, (Byte) (byte) 12, (Character) 'a', (Double) 12.5,
(Float) (float) 13.5, (Integer) 1234, (Long) 12345678900l,
(Short) (short) 12345, "something"));
ByteArrayOutputStream buff3 = new ByteArrayOutputStream();
DataOutputStream out3 = new DataOutputStream(buff3);
Long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
out3.write(rec.tupleBasicTypesToByteArray(rec.getTuple()));
out3.write(rec.tupleTypesToByteArray(rec.getTuple()));
}
DataInputStream in3 = new DataInputStream(new ByteArrayInputStream(buff3.toByteArray()));
for (int i = 0; i < 1000; i++) {
byte[] byteTypes = new byte[9];
in3.read(byteTypes);
String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9);
}
// System.out.println("Type copy with ByteArray:\t" + (System.nanoTime()
// - start) + " ns");
}
@Test
public void typeArrayCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord(
new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 },
new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l },
new Short[] { 12345 }, new String[] { "something" }));
@SuppressWarnings({ "rawtypes", "unused" })
// Class[] types = new Class[9];
// assertArrayEquals(new Class[] { Boolean[].class, Byte[].class, Character[].class, Double[].class,
// Float[].class, Integer[].class, Long[].class, Short[].class, String[].class },
// rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9));
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
new Long[] { 12345678900l }, new Short[] { 12345 },
new String[] { "something" }));
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
for (int i = 0; i < 10000; i++) {
out.write(rec.tupleBasicTypesToByteArray(rec.getTuple()));
out.write(rec.tupleTypesToByteArray(rec.getTuple()));
}
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord rec2 = new StreamRecord();
......@@ -446,22 +382,31 @@ public class StreamRecordTest {
for (int i = 0; i < 10000; i++) {
byte[] byteTypes = new byte[9];
in.read(byteTypes);
TypeInformation<?>[] basicTypes = rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
TypeInformation<?>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(basicTypes);
}
System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns");
start = System.nanoTime();
byte[] byteTypes = rec.tupleTypesToByteArray(rec.getTuple());
Tuple t = rec.getTuple();
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
byte[] byteTypes = new byte[9];
in.read(byteTypes);
// rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9);
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.parse(types2);
TypeInformation<?>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(basicTypes);
}
System.out.println("Type copy with String:\t\t" + (System.nanoTime() - start) + " ns");
System.out.println("Write with infoArray:\t\t" + (System.nanoTime() - start) + " ns");
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = (TupleTypeInfo<Tuple>) TypeExtractor.getForObject(t);
}
System.out.println("Write with extract:\t\t" + (System.nanoTime() - start) + " ns");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册