From a68d78fae67135ffdf17e7f2ac18040b68cde3f2 Mon Sep 17 00:00:00 2001 From: gaborhermann Date: Mon, 14 Jul 2014 16:29:09 +0200 Subject: [PATCH] [streaming] Updated Tuple type serialization --- .../api/streamrecord/StreamRecord.java | 215 +++++++++++------- .../examples/basictopology/BasicTopology.java | 16 +- .../examples/ml/IncrementalLearning.java | 14 ++ .../api/streamrecord/StreamRecordTest.java | 131 ++++++++--- 4 files changed, 258 insertions(+), 118 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 523b9b4b073..09a56b3af44 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.avro.util.ByteBufferInputStream; + import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple10; @@ -50,6 +52,8 @@ import eu.stratosphere.api.java.tuple.Tuple6; import eu.stratosphere.api.java.tuple.Tuple7; import eu.stratosphere.api.java.tuple.Tuple8; import eu.stratosphere.api.java.tuple.Tuple9; +import eu.stratosphere.api.java.typeutils.BasicArrayTypeInfo; +import eu.stratosphere.api.java.typeutils.BasicTypeInfo; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.api.java.typeutils.TypeInformation; @@ -75,10 +79,9 @@ public class StreamRecord implements IOReadableWritable, Serializable { private int numOfTuples; private int batchSize; - private static final Class[] CLASSES = new Class[] { Tuple1.class, Tuple2.class, - Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, - Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, - Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, + private static final Class[] CLASSES = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, + Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, + Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class }; // TODO implement equals, clone @@ -199,8 +202,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { Tuple tuple; try { tuple = tupleBatch.get(tupleNumber); @@ -254,8 +256,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * , NoSuchFieldException */ // TODO: add exception for cast for all getters - public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Boolean) getField(tupleNumber, fieldNumber); } @@ -283,8 +284,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Byte) getField(tupleNumber, fieldNumber); } @@ -297,8 +297,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Character getCharacter(int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Character getCharacter(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return getCharacter(0, fieldNumber); } @@ -313,8 +312,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Character) getField(tupleNumber, fieldNumber); } @@ -342,8 +340,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Double) getField(tupleNumber, fieldNumber); } @@ -371,8 +368,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Float) getField(tupleNumber, fieldNumber); } @@ -400,8 +396,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Integer) getField(tupleNumber, fieldNumber); } @@ -429,8 +424,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Long) getField(tupleNumber, fieldNumber); } @@ -458,8 +452,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , NoSuchFieldException */ - public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (Short) getField(tupleNumber, fieldNumber); } @@ -485,8 +478,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Position of the field in the tuple * @return value of the field as String */ - public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException, - NoSuchFieldException { + public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { return (String) getField(tupleNumber, fieldNumber); } @@ -607,8 +599,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * New value * @throws NoSuchFieldException */ - public void setCharacter(int tupleNumber, int fieldNumber, Character c) - throws NoSuchFieldException { + public void setCharacter(int tupleNumber, int fieldNumber, Character c) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, c); } @@ -831,8 +822,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , TupleSizeMismatchException */ - public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException, - TupleSizeMismatchException { + public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException { if (tuple.getArity() == numOfFields) { try { @@ -871,8 +861,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @throws NoSuchTupleException * , TupleSizeMismatchException */ - public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException, - TupleSizeMismatchException { + public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException { if (tuple.getArity() == numOfFields) { try { tupleBatch.set(tupleNumber, copyTuple(tuple)); @@ -930,19 +919,14 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - public StreamRecord copySerialized() { - + public StreamRecord copySerialized() throws IOException { ByteArrayOutputStream buff = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(buff); StreamRecord newRecord = new StreamRecord(); - try { - this.write(out); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray())); - - newRecord.read(in); - } catch (Exception e) { - } + this.write(out); + DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray())); + newRecord.read(in); return newRecord; } @@ -970,6 +954,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param tuple * Tuple to copy * @return Copy of the tuple + * @throws IllegalAccessException + * @throws InstantiationException */ public static Tuple copyTuple(Tuple tuple) { // TODO: implement deep copy for arrays @@ -977,14 +963,50 @@ public class StreamRecord implements IOReadableWritable, Serializable { Tuple newTuple = null; try { newTuple = (Tuple) CLASSES[numofFields - 1].newInstance(); - } catch (Exception e) { - - } - for (int i = 0; i < numofFields; i++) { - newTuple.setField(tuple.getField(i), i); + for (int i = 0; i < numofFields; i++) { + Class type = tuple.getField(i).getClass(); + if (type.isArray()) { + if (type.equals(Boolean[].class)) { + Boolean[] arr = (Boolean[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Byte[].class)) { + Byte[] arr = (Byte[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Character[].class)) { + Character[] arr = (Character[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Double[].class)) { + Double[] arr = (Double[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Float[].class)) { + Float[] arr = (Float[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Integer[].class)) { + Integer[] arr = (Integer[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Long[].class)) { + Long[] arr = (Long[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(Short[].class)) { + Short[] arr = (Short[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } else if (type.equals(String[].class)) { + String[] arr = (String[]) tuple.getField(i); + newTuple.setField(Arrays.copyOf(arr, arr.length), i); + } + newTuple.setField(tuple.getField(i), i); + } else { + newTuple.setField(tuple.getField(i), i); + } + } + } catch (InstantiationException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IllegalAccessException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - return newTuple; } @@ -1047,73 +1069,72 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @return Class array of field types */ @SuppressWarnings("rawtypes") - Class[] tupleBasicTypesFromByteArray(byte[] representation, int numberOfFields) { - Class[] basicTypes = new Class[representation.length]; + TypeInformation[] tupleBasicTypesFromByteArray(byte[] representation, int numberOfFields) { + TypeInformation[] basicTypes = new TypeInformation[representation.length]; for (int i = 0; i < basicTypes.length; i++) { switch (representation[i]) { case 0: - basicTypes[i] = java.lang.Boolean.class; + basicTypes[i] = BasicTypeInfo.BOOLEAN_TYPE_INFO; break; case 1: - basicTypes[i] = java.lang.Byte.class; + basicTypes[i] = BasicTypeInfo.BYTE_TYPE_INFO; break; case 2: - basicTypes[i] = java.lang.Character.class; + basicTypes[i] = BasicTypeInfo.CHAR_TYPE_INFO; break; case 3: - basicTypes[i] = java.lang.Double.class; + basicTypes[i] = BasicTypeInfo.DOUBLE_TYPE_INFO; break; case 4: - basicTypes[i] = java.lang.Float.class; + basicTypes[i] = BasicTypeInfo.FLOAT_TYPE_INFO; break; case 5: - basicTypes[i] = java.lang.Integer.class; + basicTypes[i] = BasicTypeInfo.INT_TYPE_INFO; break; case 6: - basicTypes[i] = java.lang.Long.class; + basicTypes[i] = BasicTypeInfo.LONG_TYPE_INFO; break; case 7: - basicTypes[i] = java.lang.Short.class; + basicTypes[i] = BasicTypeInfo.SHORT_TYPE_INFO; break; case 8: - basicTypes[i] = java.lang.String.class; + basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO; break; case 9: - basicTypes[i] = java.lang.Boolean[].class; + basicTypes[i] = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO; break; case 10: - basicTypes[i] = java.lang.Byte[].class; + basicTypes[i] = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; break; case 11: - basicTypes[i] = java.lang.Character[].class; + basicTypes[i] = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO; break; case 12: - basicTypes[i] = java.lang.Double[].class; + basicTypes[i] = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO; break; case 13: - basicTypes[i] = java.lang.Float[].class; + basicTypes[i] = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO; break; case 14: - basicTypes[i] = java.lang.Integer[].class; + basicTypes[i] = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO; break; case 15: - basicTypes[i] = java.lang.Long[].class; + basicTypes[i] = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO; break; case 16: - basicTypes[i] = java.lang.Short[].class; + basicTypes[i] = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO; break; case 17: - basicTypes[i] = java.lang.String[].class; + basicTypes[i] = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO; break; default: - basicTypes[i] = java.lang.String.class; + basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO; break; } } return basicTypes; } - // TODO: implement basic arrays (int[], long[]...) static String typeStringFromByteArray(byte[] representation, int numberOfFields) { StringBuilder typeInfo = new StringBuilder("Tuple"); typeInfo.append(numberOfFields + "<"); @@ -1174,6 +1195,30 @@ public class StreamRecord implements IOReadableWritable, Serializable { case 17: typeInfo.append("String[],"); break; + case 18: + typeInfo.append("boolean[],"); + break; + case 19: + typeInfo.append("byte[],"); + break; + case 20: + typeInfo.append("char[],"); + break; + case 21: + typeInfo.append("double[],"); + break; + case 22: + typeInfo.append("float[],"); + break; + case 23: + typeInfo.append("int[],"); + break; + case 24: + typeInfo.append("long[],"); + break; + case 25: + typeInfo.append("short[],"); + break; default: typeInfo.append("String,"); break; @@ -1192,26 +1237,22 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Tuple to be written * @param out * Output chosen + * @throws IOException */ - private void writeTuple(Tuple tuple, DataOutput out) { + private void writeTuple(Tuple tuple, DataOutput out) throws IOException { // TODO: exception for empty record - no getField // TODO: better serialization logic byte[] typeArray = tupleBasicTypesToByteArray(getTuple()); TypeInformation typeInfo = TypeExtractor.getForObject(getTuple()); @SuppressWarnings("unchecked") - TupleSerializer tupleSerializer = (TupleSerializer) typeInfo - .createSerializer(); - SerializationDelegate serializationDelegate = new SerializationDelegate( - tupleSerializer); + TupleSerializer tupleSerializer = (TupleSerializer) typeInfo.createSerializer(); + SerializationDelegate serializationDelegate = new SerializationDelegate(tupleSerializer); serializationDelegate.setInstance(tuple); - try { - out.writeInt(numOfFields); - out.write(typeArray); - serializationDelegate.write(out); - } catch (IOException e) { - e.printStackTrace(); - } + + out.writeInt(numOfFields); + out.write(typeArray); + serializationDelegate.write(out); } /** @@ -1228,17 +1269,15 @@ public class StreamRecord implements IOReadableWritable, Serializable { byte[] typesInByte = new byte[numberOfFields]; in.readFully(typesInByte, 0, numberOfFields); // @SuppressWarnings("rawtypes") - // Class[] basicTypes = tupleBasicTypesFromByteArray(typesInByte, - // numberOfFields); + TypeInformation[] basicTypes = tupleBasicTypesFromByteArray(typesInByte, numberOfFields); // TODO:skip this part somehow - String typeString = typeStringFromByteArray(typesInByte, numberOfFields); - TypeInformation typeInfo = TupleTypeInfo.parse(typeString); - @SuppressWarnings("unchecked") - TupleSerializer tupleSerializer = (TupleSerializer) typeInfo - .createSerializer(); + //String typeString = typeStringFromByteArray(typesInByte, numberOfFields); + TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes); + TupleSerializer tupleSerializer = typeInfo.createSerializer(); DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer); dd.setInstance(tupleSerializer.createInstance()); + dd.read(in); return dd.getInstance(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java index d3e0c84eb26..243fc475c6c 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/basictopology/BasicTopology.java @@ -1,3 +1,17 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ package eu.stratosphere.streaming.examples.basictopology; import java.net.InetSocketAddress; @@ -20,7 +34,7 @@ public class BasicTopology { public static class BasicSource extends UserSourceInvokable { - StreamRecord record = new StreamRecord(new Tuple1("streaming")); + StreamRecord record = new StreamRecord(new Tuple1(new String[] {"streaming", "flink"})); @Override public void invoke() throws Exception { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearning.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearning.java index 8253735342e..da2e663abf4 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearning.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearning.java @@ -1,3 +1,17 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ package eu.stratosphere.streaming.examples.ml; import java.net.InetSocketAddress; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index 0247823ceda..dfb31dc88d3 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -26,7 +26,9 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; +import org.junit.Ignore; import org.junit.Test; import eu.stratosphere.api.java.tuple.Tuple; @@ -34,22 +36,18 @@ import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple5; +import eu.stratosphere.api.java.tuple.Tuple8; import eu.stratosphere.api.java.tuple.Tuple9; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; -import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.api.java.typeutils.TypeInformation; -import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; -import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; -import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; -import eu.stratosphere.types.StringValue; public class StreamRecordTest { @Test public void singleRecordSetGetTest() { StreamRecord record = new StreamRecord( - new Tuple9( - "Stratosphere", 1, 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42)); + new Tuple9("Stratosphere", 1, + 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42)); assertEquals(9, record.getNumOfFields()); assertEquals(1, record.getNumOfTuples()); @@ -168,8 +166,7 @@ public class StreamRecordTest { assertEquals(false, record.getBoolean(0, 3)); assertEquals((Double) 0., record.getDouble(0, 4)); - record.setTuple(1, new Tuple5("Stratosphere", 1, - 2L, true, 3.5)); + record.setTuple(1, new Tuple5("Stratosphere", 1, 2L, true, 3.5)); assertEquals("Stratosphere", record.getString(1, 0)); assertEquals((Integer) 1, record.getInteger(1, 1)); @@ -187,8 +184,7 @@ public class StreamRecordTest { assertEquals(false, record.getBoolean(0, 3)); assertEquals((Double) 0., record.getDouble(0, 4)); - record.addTuple(0, new Tuple5("Stratosphere", 1, - 2L, true, 3.5)); + record.addTuple(0, new Tuple5("Stratosphere", 1, 2L, true, 3.5)); assertEquals(2, record.getNumOfTuples()); @@ -201,7 +197,7 @@ public class StreamRecordTest { } @Test - public void copyTest() { + public void copyTest() throws IOException { StreamRecord a = new StreamRecord(new Tuple1("Big")); a.setId(0); StreamRecord b = a.copy(); @@ -320,8 +316,7 @@ public class StreamRecordTest { int num = 42; String str = "above clouds"; Integer[] intArray = new Integer[] { 1, 2 }; - StreamRecord rec = new StreamRecord(new Tuple3(num, str, - intArray)); + StreamRecord rec = new StreamRecord(new Tuple3(num, str, intArray)); try { rec.write(out); @@ -330,8 +325,7 @@ public class StreamRecordTest { StreamRecord newRec = new StreamRecord(); newRec.read(in); @SuppressWarnings("unchecked") - Tuple3 tupleOut = (Tuple3) newRec - .getTuple(0); + Tuple3 tupleOut = (Tuple3) newRec.getTuple(0); assertEquals(tupleOut.getField(0), 42); assertEquals(str, tupleOut.getField(1)); @@ -345,36 +339,71 @@ public class StreamRecordTest { @Test public void tupleCopyTest() { - Tuple2 t1 = new Tuple2("a", 1); + Tuple3 t1 = new Tuple3("a", 1, new Double[]{ 4.2 }); @SuppressWarnings("rawtypes") - Tuple2 t2 = (Tuple2) StreamRecord.copyTuple(t1); + Tuple3 t2 = (Tuple3) StreamRecord.copyTuple(t1); assertEquals("a", t2.getField(0)); assertEquals(1, t2.getField(1)); + assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2)); t1.setField(2, 1); assertEquals(1, t2.getField(1)); assertEquals(2, t1.getField(1)); - + + t1.setField(new Double[] {3.14}, 2); + assertArrayEquals(new Double[] {3.14}, (Double[]) t1.getField(2)); + assertArrayEquals(new Double[] {4.2}, (Double[]) t2.getField(2)); + assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass()); assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass()); + } + + @Test + public void tupleArraySerializationTest() throws IOException { + Tuple9 t1 = new Tuple9( + new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 }, + new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l }, new Short[] { 12345 }, + new String[] { "something" }); + + StreamRecord s1 = new StreamRecord(t1); + StreamRecord s2 = s1.copySerialized(); + Tuple9 t2 = (Tuple9) s2.getTuple(); + + assertArrayEquals(new Boolean[] { true }, (Boolean[]) t2.getField(0)); + assertArrayEquals(new Byte[] { 12 }, (Byte[]) t2.getField(1)); + assertArrayEquals(new Character[] { 'a' }, (Character[]) t2.getField(2)); + assertArrayEquals(new Double[] { 12.5 }, (Double[]) t2.getField(3)); + assertArrayEquals(new Float[] { 13.5f }, (Float[]) t2.getField(4)); + assertArrayEquals(new Integer[] { 1234 }, (Integer[]) t2.getField(5)); + assertArrayEquals(new Long[] { 12345678900l }, (Long[]) t2.getField(6)); + assertArrayEquals(new Short[] { 12345 }, (Short[]) t2.getField(7)); + assertArrayEquals(new String[] { "something" }, (String[]) t2.getField(8)); + assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass()); + assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass()); + assertEquals(t1.getField(2).getClass(), t2.getField(2).getClass()); + assertEquals(t1.getField(3).getClass(), t2.getField(3).getClass()); + assertEquals(t1.getField(4).getClass(), t2.getField(4).getClass()); + assertEquals(t1.getField(5).getClass(), t2.getField(5).getClass()); + assertEquals(t1.getField(6).getClass(), t2.getField(6).getClass()); + assertEquals(t1.getField(7).getClass(), t2.getField(7).getClass()); + assertEquals(t1.getField(8).getClass(), t2.getField(8).getClass()); } - //TODO:measure performance of different serialization logics + // TODO:measure performance of different serialization logics @Test public void typeCopyTest() throws NoSuchTupleException, IOException { StreamRecord rec = new StreamRecord( - new Tuple9( - (Boolean) true, (Byte) (byte) 12, (Character) 'a', (Double) 12.5, - (Float) (float) 13.5, (Integer) 1234, (Long) 12345678900l, - (Short) (short) 12345, "something")); + new Tuple9((Boolean) true, + (Byte) (byte) 12, (Character) 'a', (Double) 12.5, (Float) (float) 13.5, (Integer) 1234, + (Long) 12345678900l, (Short) (short) 12345, "something")); @SuppressWarnings({ "rawtypes", "unused" }) - Class[] types = new Class[9]; - assertArrayEquals(new Class[] { Boolean.class, Byte.class, Character.class, Double.class, - Float.class, Integer.class, Long.class, Short.class, String.class }, - rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9)); +// Class[] types = new Class[9]; +// assertArrayEquals(new TypeInformation[] { STRING_TYPE_INFO, BYTE_TYPE_INFO, Character.class, Double.class, Float.class, +// Integer.class, Long.class, Short.class, String.class }, +// rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9)); ByteArrayOutputStream buff3 = new ByteArrayOutputStream(); DataOutputStream out3 = new DataOutputStream(buff3); @@ -388,7 +417,51 @@ public class StreamRecordTest { in3.read(byteTypes); String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9); } + // System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() + // - start) + " ns"); + } + + @Test + public void typeArrayCopyTest() throws NoSuchTupleException, IOException { + StreamRecord rec = new StreamRecord( + new Tuple9( + new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' }, new Double[] { 12.5 }, + new Float[] { 13.5f }, new Integer[] { 1234 }, new Long[] { 12345678900l }, + new Short[] { 12345 }, new String[] { "something" })); + + @SuppressWarnings({ "rawtypes", "unused" }) +// Class[] types = new Class[9]; +// assertArrayEquals(new Class[] { Boolean[].class, Byte[].class, Character[].class, Double[].class, +// Float[].class, Integer[].class, Long[].class, Short[].class, String[].class }, +// rec.tupleBasicTypesFromByteArray(rec.tupleBasicTypesToByteArray(rec.getTuple()), 9)); + + ByteArrayOutputStream buff = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(buff); + for (int i = 0; i < 10000; i++) { + out.write(rec.tupleBasicTypesToByteArray(rec.getTuple())); + } + DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray())); + StreamRecord rec2 = new StreamRecord(); + Long start = System.nanoTime(); + for (int i = 0; i < 10000; i++) { + byte[] byteTypes = new byte[9]; + in.read(byteTypes); + TypeInformation[] basicTypes = rec2.tupleBasicTypesFromByteArray(byteTypes, 9); + TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes); + } System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns"); + + start = System.nanoTime(); + for (int i = 0; i < 10000; i++) { + byte[] byteTypes = new byte[9]; + in.read(byteTypes); + // rec2.tupleBasicTypesFromByteArray(byteTypes, 9); + String types2 = StreamRecord.typeStringFromByteArray(byteTypes, 9); + TypeInformation typeInfo = TupleTypeInfo.parse(types2); + + } + System.out.println("Type copy with String:\t\t" + (System.nanoTime() - start) + " ns"); + } - + } -- GitLab