提交 a68d78fa 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Updated Tuple type serialization

上级 16d90f0d
...@@ -27,6 +27,8 @@ import java.util.ArrayList; ...@@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.avro.util.ByteBufferInputStream;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple10; import eu.stratosphere.api.java.tuple.Tuple10;
...@@ -50,6 +52,8 @@ import eu.stratosphere.api.java.tuple.Tuple6; ...@@ -50,6 +52,8 @@ import eu.stratosphere.api.java.tuple.Tuple6;
import eu.stratosphere.api.java.tuple.Tuple7; import eu.stratosphere.api.java.tuple.Tuple7;
import eu.stratosphere.api.java.tuple.Tuple8; import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9; import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.BasicArrayTypeInfo;
import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.TypeInformation; import eu.stratosphere.api.java.typeutils.TypeInformation;
...@@ -75,10 +79,9 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -75,10 +79,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private int numOfTuples; private int numOfTuples;
private int batchSize; private int batchSize;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class }; Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone // TODO implement equals, clone
...@@ -199,8 +202,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -199,8 +202,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
Tuple tuple; Tuple tuple;
try { try {
tuple = tupleBatch.get(tupleNumber); tuple = tupleBatch.get(tupleNumber);
...@@ -254,8 +256,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -254,8 +256,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* , NoSuchFieldException * , NoSuchFieldException
*/ */
// TODO: add exception for cast for all getters // TODO: add exception for cast for all getters
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Boolean) getField(tupleNumber, fieldNumber); return (Boolean) getField(tupleNumber, fieldNumber);
} }
...@@ -283,8 +284,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -283,8 +284,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Byte) getField(tupleNumber, fieldNumber); return (Byte) getField(tupleNumber, fieldNumber);
} }
...@@ -297,8 +297,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -297,8 +297,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Character getCharacter(int fieldNumber) throws NoSuchTupleException, public Character getCharacter(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return getCharacter(0, fieldNumber); return getCharacter(0, fieldNumber);
} }
...@@ -313,8 +312,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -313,8 +312,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Character) getField(tupleNumber, fieldNumber); return (Character) getField(tupleNumber, fieldNumber);
} }
...@@ -342,8 +340,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -342,8 +340,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Double) getField(tupleNumber, fieldNumber); return (Double) getField(tupleNumber, fieldNumber);
} }
...@@ -371,8 +368,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -371,8 +368,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Float) getField(tupleNumber, fieldNumber); return (Float) getField(tupleNumber, fieldNumber);
} }
...@@ -400,8 +396,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -400,8 +396,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Integer) getField(tupleNumber, fieldNumber); return (Integer) getField(tupleNumber, fieldNumber);
} }
...@@ -429,8 +424,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -429,8 +424,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Long) getField(tupleNumber, fieldNumber); return (Long) getField(tupleNumber, fieldNumber);
} }
...@@ -458,8 +452,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -458,8 +452,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , NoSuchFieldException * , NoSuchFieldException
*/ */
public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (Short) getField(tupleNumber, fieldNumber); return (Short) getField(tupleNumber, fieldNumber);
} }
...@@ -485,8 +478,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -485,8 +478,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the field in the tuple * Position of the field in the tuple
* @return value of the field as String * @return value of the field as String
*/ */
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException, public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
NoSuchFieldException {
return (String) getField(tupleNumber, fieldNumber); return (String) getField(tupleNumber, fieldNumber);
} }
...@@ -607,8 +599,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -607,8 +599,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value * New value
* @throws NoSuchFieldException * @throws NoSuchFieldException
*/ */
public void setCharacter(int tupleNumber, int fieldNumber, Character c) public void setCharacter(int tupleNumber, int fieldNumber, Character c) throws NoSuchFieldException {
throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, c); setField(tupleNumber, fieldNumber, c);
} }
...@@ -831,8 +822,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -831,8 +822,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , TupleSizeMismatchException * , TupleSizeMismatchException
*/ */
public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException, public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) { if (tuple.getArity() == numOfFields) {
try { try {
...@@ -871,8 +861,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -871,8 +861,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , TupleSizeMismatchException * , TupleSizeMismatchException
*/ */
public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException, public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) { if (tuple.getArity() == numOfFields) {
try { try {
tupleBatch.set(tupleNumber, copyTuple(tuple)); tupleBatch.set(tupleNumber, copyTuple(tuple));
...@@ -930,19 +919,14 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -930,19 +919,14 @@ public class StreamRecord implements IOReadableWritable, Serializable {
} }
} }
public StreamRecord copySerialized() { public StreamRecord copySerialized() throws IOException {
ByteArrayOutputStream buff = new ByteArrayOutputStream(); ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff); DataOutputStream out = new DataOutputStream(buff);
StreamRecord newRecord = new StreamRecord(); StreamRecord newRecord = new StreamRecord();
try { this.write(out);
this.write(out); DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
newRecord.read(in);
} catch (Exception e) {
}
newRecord.read(in);
return newRecord; return newRecord;
} }
...@@ -970,6 +954,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -970,6 +954,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tuple * @param tuple
* Tuple to copy * Tuple to copy
* @return Copy of the tuple * @return Copy of the tuple
* @throws IllegalAccessException
* @throws InstantiationException
*/ */
public static Tuple copyTuple(Tuple tuple) { public static Tuple copyTuple(Tuple tuple) {
// TODO: implement deep copy for arrays // TODO: implement deep copy for arrays
...@@ -977,14 +963,50 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -977,14 +963,50 @@ public class StreamRecord implements IOReadableWritable, Serializable {
Tuple newTuple = null; Tuple newTuple = null;
try { try {
newTuple = (Tuple) CLASSES[numofFields - 1].newInstance(); newTuple = (Tuple) CLASSES[numofFields - 1].newInstance();
} catch (Exception e) {
}
for (int i = 0; i < numofFields; i++) { for (int i = 0; i < numofFields; i++) {
newTuple.setField(tuple.getField(i), i); Class<? extends Object> type = tuple.getField(i).getClass();
if (type.isArray()) {
if (type.equals(Boolean[].class)) {
Boolean[] arr = (Boolean[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Byte[].class)) {
Byte[] arr = (Byte[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Character[].class)) {
Character[] arr = (Character[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Double[].class)) {
Double[] arr = (Double[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Float[].class)) {
Float[] arr = (Float[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Integer[].class)) {
Integer[] arr = (Integer[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Long[].class)) {
Long[] arr = (Long[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Short[].class)) {
Short[] arr = (Short[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(String[].class)) {
String[] arr = (String[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
}
newTuple.setField(tuple.getField(i), i);
} else {
newTuple.setField(tuple.getField(i), i);
}
}
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
return newTuple; return newTuple;
} }
...@@ -1047,73 +1069,72 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -1047,73 +1069,72 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return Class array of field types * @return Class array of field types
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Class[] tupleBasicTypesFromByteArray(byte[] representation, int numberOfFields) { TypeInformation[] tupleBasicTypesFromByteArray(byte[] representation, int numberOfFields) {
Class[] basicTypes = new Class[representation.length]; TypeInformation[] basicTypes = new TypeInformation[representation.length];
for (int i = 0; i < basicTypes.length; i++) { for (int i = 0; i < basicTypes.length; i++) {
switch (representation[i]) { switch (representation[i]) {
case 0: case 0:
basicTypes[i] = java.lang.Boolean.class; basicTypes[i] = BasicTypeInfo.BOOLEAN_TYPE_INFO;
break; break;
case 1: case 1:
basicTypes[i] = java.lang.Byte.class; basicTypes[i] = BasicTypeInfo.BYTE_TYPE_INFO;
break; break;
case 2: case 2:
basicTypes[i] = java.lang.Character.class; basicTypes[i] = BasicTypeInfo.CHAR_TYPE_INFO;
break; break;
case 3: case 3:
basicTypes[i] = java.lang.Double.class; basicTypes[i] = BasicTypeInfo.DOUBLE_TYPE_INFO;
break; break;
case 4: case 4:
basicTypes[i] = java.lang.Float.class; basicTypes[i] = BasicTypeInfo.FLOAT_TYPE_INFO;
break; break;
case 5: case 5:
basicTypes[i] = java.lang.Integer.class; basicTypes[i] = BasicTypeInfo.INT_TYPE_INFO;
break; break;
case 6: case 6:
basicTypes[i] = java.lang.Long.class; basicTypes[i] = BasicTypeInfo.LONG_TYPE_INFO;
break; break;
case 7: case 7:
basicTypes[i] = java.lang.Short.class; basicTypes[i] = BasicTypeInfo.SHORT_TYPE_INFO;
break; break;
case 8: case 8:
basicTypes[i] = java.lang.String.class; basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
break; break;
case 9: case 9:
basicTypes[i] = java.lang.Boolean[].class; basicTypes[i] = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
break; break;
case 10: case 10:
basicTypes[i] = java.lang.Byte[].class; basicTypes[i] = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
break; break;
case 11: case 11:
basicTypes[i] = java.lang.Character[].class; basicTypes[i] = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
break; break;
case 12: case 12:
basicTypes[i] = java.lang.Double[].class; basicTypes[i] = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
break; break;
case 13: case 13:
basicTypes[i] = java.lang.Float[].class; basicTypes[i] = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
break; break;
case 14: case 14:
basicTypes[i] = java.lang.Integer[].class; basicTypes[i] = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
break; break;
case 15: case 15:
basicTypes[i] = java.lang.Long[].class; basicTypes[i] = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
break; break;
case 16: case 16:
basicTypes[i] = java.lang.Short[].class; basicTypes[i] = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
break; break;
case 17: case 17:
basicTypes[i] = java.lang.String[].class; basicTypes[i] = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
break; break;
default: default:
basicTypes[i] = java.lang.String.class; basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
break; break;
} }
} }
return basicTypes; return basicTypes;
} }
// TODO: implement basic arrays (int[], long[]...)
static String typeStringFromByteArray(byte[] representation, int numberOfFields) { static String typeStringFromByteArray(byte[] representation, int numberOfFields) {
StringBuilder typeInfo = new StringBuilder("Tuple"); StringBuilder typeInfo = new StringBuilder("Tuple");
typeInfo.append(numberOfFields + "<"); typeInfo.append(numberOfFields + "<");
...@@ -1174,6 +1195,30 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -1174,6 +1195,30 @@ public class StreamRecord implements IOReadableWritable, Serializable {
case 17: case 17:
typeInfo.append("String[],"); typeInfo.append("String[],");
break; break;
case 18:
typeInfo.append("boolean[],");
break;
case 19:
typeInfo.append("byte[],");
break;
case 20:
typeInfo.append("char[],");
break;
case 21:
typeInfo.append("double[],");
break;
case 22:
typeInfo.append("float[],");
break;
case 23:
typeInfo.append("int[],");
break;
case 24:
typeInfo.append("long[],");
break;
case 25:
typeInfo.append("short[],");
break;
default: default:
typeInfo.append("String,"); typeInfo.append("String,");
break; break;
...@@ -1192,26 +1237,22 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -1192,26 +1237,22 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Tuple to be written * Tuple to be written
* @param out * @param out
* Output chosen * Output chosen
* @throws IOException
*/ */
private void writeTuple(Tuple tuple, DataOutput out) { private void writeTuple(Tuple tuple, DataOutput out) throws IOException {
// TODO: exception for empty record - no getField // TODO: exception for empty record - no getField
// TODO: better serialization logic // TODO: better serialization logic
byte[] typeArray = tupleBasicTypesToByteArray(getTuple()); byte[] typeArray = tupleBasicTypesToByteArray(getTuple());
TypeInformation<? extends Tuple> typeInfo = TypeExtractor.getForObject(getTuple()); TypeInformation<? extends Tuple> typeInfo = TypeExtractor.getForObject(getTuple());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();
.createSerializer(); SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(
tupleSerializer);
serializationDelegate.setInstance(tuple); serializationDelegate.setInstance(tuple);
try {
out.writeInt(numOfFields); out.writeInt(numOfFields);
out.write(typeArray); out.write(typeArray);
serializationDelegate.write(out); serializationDelegate.write(out);
} catch (IOException e) {
e.printStackTrace();
}
} }
/** /**
...@@ -1228,17 +1269,15 @@ public class StreamRecord implements IOReadableWritable, Serializable { ...@@ -1228,17 +1269,15 @@ public class StreamRecord implements IOReadableWritable, Serializable {
byte[] typesInByte = new byte[numberOfFields]; byte[] typesInByte = new byte[numberOfFields];
in.readFully(typesInByte, 0, numberOfFields); in.readFully(typesInByte, 0, numberOfFields);
// @SuppressWarnings("rawtypes") // @SuppressWarnings("rawtypes")
// Class[] basicTypes = tupleBasicTypesFromByteArray(typesInByte, TypeInformation<?>[] basicTypes = tupleBasicTypesFromByteArray(typesInByte, numberOfFields);
// numberOfFields);
// TODO:skip this part somehow // TODO:skip this part somehow
String typeString = typeStringFromByteArray(typesInByte, numberOfFields); //String typeString = typeStringFromByteArray(typesInByte, numberOfFields);
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.parse(typeString); TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
@SuppressWarnings("unchecked") TupleSerializer<Tuple> tupleSerializer = typeInfo.createSerializer();
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer); DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance()); dd.setInstance(tupleSerializer.createInstance());
dd.read(in); dd.read(in);
return dd.getInstance(); return dd.getInstance();
} }
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.basictopology; package eu.stratosphere.streaming.examples.basictopology;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -20,7 +34,7 @@ public class BasicTopology { ...@@ -20,7 +34,7 @@ public class BasicTopology {
public static class BasicSource extends UserSourceInvokable { public static class BasicSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>("streaming")); StreamRecord record = new StreamRecord(new Tuple1<String[]>(new String[] {"streaming", "flink"}));
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml; package eu.stratosphere.streaming.examples.ml;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
......
...@@ -26,7 +26,9 @@ import java.io.ByteArrayOutputStream; ...@@ -26,7 +26,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
...@@ -34,22 +36,18 @@ import eu.stratosphere.api.java.tuple.Tuple1; ...@@ -34,22 +36,18 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5; import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9; import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.TypeInformation; import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.StringValue;
public class StreamRecordTest { public class StreamRecordTest {
@Test @Test
public void singleRecordSetGetTest() { public void singleRecordSetGetTest() {
StreamRecord record = new StreamRecord( StreamRecord record = new StreamRecord(
new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>( new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>("Stratosphere", 1,
"Stratosphere", 1, 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42)); 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42));
assertEquals(9, record.getNumOfFields()); assertEquals(9, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples()); assertEquals(1, record.getNumOfTuples());
...@@ -168,8 +166,7 @@ public class StreamRecordTest { ...@@ -168,8 +166,7 @@ public class StreamRecordTest {
assertEquals(false, record.getBoolean(0, 3)); assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4)); assertEquals((Double) 0., record.getDouble(0, 4));
record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, 2L, true, 3.5));
2L, true, 3.5));
assertEquals("Stratosphere", record.getString(1, 0)); assertEquals("Stratosphere", record.getString(1, 0));
assertEquals((Integer) 1, record.getInteger(1, 1)); assertEquals((Integer) 1, record.getInteger(1, 1));
...@@ -187,8 +184,7 @@ public class StreamRecordTest { ...@@ -187,8 +184,7 @@ public class StreamRecordTest {
assertEquals(false, record.getBoolean(0, 3)); assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4)); assertEquals((Double) 0., record.getDouble(0, 4));
record.addTuple(0, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, record.addTuple(0, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, 2L, true, 3.5));
2L, true, 3.5));
assertEquals(2, record.getNumOfTuples()); assertEquals(2, record.getNumOfTuples());
...@@ -201,7 +197,7 @@ public class StreamRecordTest { ...@@ -201,7 +197,7 @@ public class StreamRecordTest {
} }
@Test @Test
public void copyTest() { public void copyTest() throws IOException {
StreamRecord a = new StreamRecord(new Tuple1<String>("Big")); StreamRecord a = new StreamRecord(new Tuple1<String>("Big"));
a.setId(0); a.setId(0);
StreamRecord b = a.copy(); StreamRecord b = a.copy();
...@@ -320,8 +316,7 @@ public class StreamRecordTest { ...@@ -320,8 +316,7 @@ public class StreamRecordTest {
int num = 42; int num = 42;
String str = "above clouds"; String str = "above clouds";
Integer[] intArray = new Integer[] { 1, 2 }; Integer[] intArray = new Integer[] { 1, 2 };
StreamRecord rec = new StreamRecord(new Tuple3<Integer, String, Integer[]>(num, str, StreamRecord rec = new StreamRecord(new Tuple3<Integer, String, Integer[]>(num, str, intArray));
intArray));
try { try {
rec.write(out); rec.write(out);
...@@ -330,8 +325,7 @@ public class StreamRecordTest { ...@@ -330,8 +325,7 @@ public class StreamRecordTest {
StreamRecord newRec = new StreamRecord(); StreamRecord newRec = new StreamRecord();
newRec.read(in); newRec.read(in);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Tuple3<Integer, String, Integer[]> tupleOut = (Tuple3<Integer, String, Integer[]>) newRec Tuple3<Integer, String, Integer[]> tupleOut = (Tuple3<Integer, String, Integer[]>) newRec.getTuple(0);
.getTuple(0);
assertEquals(tupleOut.getField(0), 42); assertEquals(tupleOut.getField(0), 42);
assertEquals(str, tupleOut.getField(1)); assertEquals(str, tupleOut.getField(1));
...@@ -345,36 +339,71 @@ public class StreamRecordTest { ...@@ -345,36 +339,71 @@ public class StreamRecordTest {
@Test @Test
public void tupleCopyTest() { public void tupleCopyTest() {
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("a", 1); Tuple3<String, Integer, Double[]> t1 = new Tuple3<String, Integer, Double[]>("a", 1, new Double[]{ 4.2 });
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Tuple2 t2 = (Tuple2) StreamRecord.copyTuple(t1); Tuple3 t2 = (Tuple3) StreamRecord.copyTuple(t1);
assertEquals("a", t2.getField(0)); assertEquals("a", t2.getField(0));
assertEquals(1, t2.getField(1)); assertEquals(1, t2.getField(1));
assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2));
t1.setField(2, 1); t1.setField(2, 1);
assertEquals(1, t2.getField(1)); assertEquals(1, t2.getField(1));
assertEquals(2, t1.getField(1)); assertEquals(2, t1.getField(1));
t1.setField(new Double[] {3.14}, 2);
assertArrayEquals(new Double[] {3.14}, (Double[]) t1.getField(2));
assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass()); assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass()); assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
}
@Test
public void tupleArraySerializationTest() throws IOException {
Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]> t1 = new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 },
new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l }, new Short[] { 12345 },
new String[] { "something" });
StreamRecord s1 = new StreamRecord(t1);
StreamRecord s2 = s1.copySerialized();
Tuple9 t2 = (Tuple9) s2.getTuple();
assertArrayEquals(new Boolean[] { true }, (Boolean[]) t2.getField(0));
assertArrayEquals(new Byte[] { 12 }, (Byte[]) t2.getField(1));
assertArrayEquals(new Character[] { 'a' }, (Character[]) t2.getField(2));
assertArrayEquals(new Double[] { 12.5 }, (Double[]) t2.getField(3));
assertArrayEquals(new Float[] { 13.5f }, (Float[]) t2.getField(4));
assertArrayEquals(new Integer[] { 1234 }, (Integer[]) t2.getField(5));
assertArrayEquals(new Long[] { 12345678900l }, (Long[]) t2.getField(6));
assertArrayEquals(new Short[] { 12345 }, (Short[]) t2.getField(7));
assertArrayEquals(new String[] { "something" }, (String[]) t2.getField(8));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
assertEquals(t1.getField(2).getClass(), t2.getField(2).getClass());
assertEquals(t1.getField(3).getClass(), t2.getField(3).getClass());
assertEquals(t1.getField(4).getClass(), t2.getField(4).getClass());
assertEquals(t1.getField(5).getClass(), t2.getField(5).getClass());
assertEquals(t1.getField(6).getClass(), t2.getField(6).getClass());
assertEquals(t1.getField(7).getClass(), t2.getField(7).getClass());
assertEquals(t1.getField(8).getClass(), t2.getField(8).getClass());
} }
//TODO:measure performance of different serialization logics // TODO:measure performance of different serialization logics
@Test @Test
public void typeCopyTest() throws NoSuchTupleException, IOException { public void typeCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord( StreamRecord rec = new StreamRecord(
new Tuple9<Boolean, Byte, Character, Double, Float, Integer, Long, Short, String>( new Tuple9<Boolean, Byte, Character, Double, Float, Integer, Long, Short, String>((Boolean) true,
(Boolean) true, (Byte) (byte) 12, (Character) 'a', (Double) 12.5, (Byte) (byte) 12, (Character) 'a', (Double) 12.5, (Float) (float) 13.5, (Integer) 1234,
(Float) (float) 13.5, (Integer) 1234, (Long) 12345678900l, (Long) 12345678900l, (Short) (short) 12345, "something"));
(Short) (short) 12345, "something"));
@SuppressWarnings({ "rawtypes", "unused" }) @SuppressWarnings({ "rawtypes", "unused" })
Class[] types = new Class[9]; // Class[] types = new Class[9];
assertArrayEquals(new Class[] { Boolean.class, Byte.class, Character.class, Double.class, // assertArrayEquals(new TypeInformation[] { STRING_TYPE_INFO, BYTE_TYPE_INFO, Character.class, Double.class, Float.class,
Float.class, Integer.class, Long.class, Short.class, String.class }, // Integer.class, Long.class, Short.class, String.class },
rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9)); // rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9));
ByteArrayOutputStream buff3 = new ByteArrayOutputStream(); ByteArrayOutputStream buff3 = new ByteArrayOutputStream();
DataOutputStream out3 = new DataOutputStream(buff3); DataOutputStream out3 = new DataOutputStream(buff3);
...@@ -388,7 +417,51 @@ public class StreamRecordTest { ...@@ -388,7 +417,51 @@ public class StreamRecordTest {
in3.read(byteTypes); in3.read(byteTypes);
String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9); String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9);
} }
// System.out.println("Type copy with ByteArray:\t" + (System.nanoTime()
// - start) + " ns");
}
@Test
public void typeArrayCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord(
new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 },
new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l },
new Short[] { 12345 }, new String[] { "something" }));
@SuppressWarnings({ "rawtypes", "unused" })
// Class[] types = new Class[9];
// assertArrayEquals(new Class[] { Boolean[].class, Byte[].class, Character[].class, Double[].class,
// Float[].class, Integer[].class, Long[].class, Short[].class, String[].class },
// rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9));
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
for (int i = 0; i < 10000; i++) {
out.write(rec.tupleBasicTypesToByteArray(rec.getTuple()));
}
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord rec2 = new StreamRecord();
Long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
byte[] byteTypes = new byte[9];
in.read(byteTypes);
TypeInformation<?>[] basicTypes = rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
}
System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns"); System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns");
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
byte[] byteTypes = new byte[9];
in.read(byteTypes);
// rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9);
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.parse(types2);
}
System.out.println("Type copy with String:\t\t" + (System.nanoTime() - start) + " ns");
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册