From a6d09db3859f370f851357efd21bfb50655f9b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Mon, 14 Jul 2014 16:29:05 +0200 Subject: [PATCH] [streaming] getFieldSpeedTest --- .../api/streamrecord/StreamRecord.java | 44 ++++++++---- .../api/streamrecord/StreamRecordTest.java | 67 +++++++++++++++---- 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 6e24314fd7d..c3728acb163 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -95,7 +95,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { tupleBatch = new ArrayList(); } - + public StreamRecord(int numOfFields, int batchSize) { this.numOfFields = numOfFields; this.numOfTuples = 0; @@ -208,6 +208,21 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } + public Object getFieldFast(int tupleNumber, int fieldNumber) + throws NoSuchTupleException, NoSuchFieldException { + Tuple tuple; + try { + tuple = tupleBatch.get(tupleNumber); + } catch (IndexOutOfBoundsException e) { + throw (new NoSuchTupleException()); + } + try { + return tuple.getFieldFast(fieldNumber); + } catch (IndexOutOfBoundsException e) { + throw (new NoSuchFieldException()); + } + } + /** * Get a Boolean from the given field of the first Tuple of the batch * @@ -244,8 +259,9 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @param fieldNumber * Position of the field in the tuple - * @return value of the field as Double * @throws NoSuchTupleException , - * NoSuchFieldException + * @return value of the field as Double + * @throws NoSuchTupleException + * , NoSuchFieldException */ public Double getDouble(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { @@ -259,8 +275,9 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Position of the tuple in the batch * @param fieldNumber * Position of the field in the tuple - * @return value of the field as Double * @throws NoSuchTupleException , - * NoSuchFieldException + * @return value of the field as Double + * @throws NoSuchTupleException + * , NoSuchFieldException */ public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException { @@ -464,7 +481,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { */ public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException { - setInteger(0, fieldNumber,i); + setInteger(0, fieldNumber, i); } /** @@ -669,7 +686,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - public StreamRecord copySerialized() { ByteArrayOutputStream buff = new ByteArrayOutputStream(); @@ -686,7 +702,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { return newRecord; } - + /** * Creates a deep copy of the StreamRecord * @@ -694,13 +710,13 @@ public class StreamRecord implements IOReadableWritable, Serializable { * */ public StreamRecord copy() { - StreamRecord newRecord= new StreamRecord(numOfFields,numOfTuples); - newRecord.uid=new StringValue(uid.getValue()); - - for(Tuple tuple: tupleBatch){ - newRecord.tupleBatch.add(StreamRecord.copyTuple(tuple)); + StreamRecord newRecord = new StreamRecord(numOfFields, numOfTuples); + newRecord.uid = new StringValue(uid.getValue()); + + for (Tuple tuple : tupleBatch) { + newRecord.tupleBatch.add(copyTuple(tuple)); } - + return newRecord; } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index c5dbcfc5b2b..fde6a87426d 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -30,12 +30,14 @@ import org.junit.Test; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple4; public class StreamRecordTest { @Test public void singleRecordSetGetTest() { - StreamRecord record = new StreamRecord(new Tuple2("Stratosphere", 1)); + StreamRecord record = new StreamRecord(new Tuple2( + "Stratosphere", 1)); assertEquals(2, record.getNumOfFields()); assertEquals(1, record.getNumOfTuples()); @@ -46,19 +48,21 @@ public class StreamRecordTest { record.setInteger(1, 2); assertEquals("Big Data", record.getString(0)); assertEquals((Integer) 2, record.getInteger(1)); - - record.setTuple(new Tuple2("Big Data looks tiny from here.", 2L)); + record.setTuple(new Tuple2( + "Big Data looks tiny from here.", 2L)); assertEquals(2, record.getNumOfFields()); assertEquals(1, record.getNumOfTuples()); assertEquals((Long) 2L, record.getLong(1)); - record.setTuple(new Tuple2("Big Data looks tiny from here.", true)); + record.setTuple(new Tuple2( + "Big Data looks tiny from here.", true)); assertEquals(2, record.getNumOfFields()); assertEquals(1, record.getNumOfTuples()); assertEquals(true, record.getBoolean(1)); - record.setTuple(new Tuple2("Big Data looks tiny from here.", 2.5)); + record.setTuple(new Tuple2( + "Big Data looks tiny from here.", 2.5)); assertEquals(2, record.getNumOfFields()); assertEquals(1, record.getNumOfTuples()); assertEquals((Double) 2.5, record.getDouble(1)); @@ -78,7 +82,8 @@ public class StreamRecordTest { @Test public void batchRecordSetGetTest() { - StreamRecord record = new StreamRecord(new Tuple2(1, 2)); + StreamRecord record = new StreamRecord(new Tuple2(1, + 2)); record.addTuple(new Tuple2(2, 2)); try { record.addTuple(new Tuple1("4")); @@ -100,7 +105,6 @@ public class StreamRecordTest { @Test public void copyTest() { - // TODO:test ID copy StreamRecord a = new StreamRecord(new Tuple1("Big")); StreamRecord b = a.copy(); assertTrue(a.getField(0).equals(b.getField(0))); @@ -109,16 +113,48 @@ public class StreamRecordTest { b.setTuple(new Tuple1("Data")); assertFalse(a.getId().equals(b.getId())); assertFalse(a.getField(0).equals(b.getField(0))); + final int ITERATION = 10000; StreamRecord c = new StreamRecord(new Tuple1("Big")); long t = System.nanoTime(); - c.copySerialized(); - System.out.println("Serialized copy:\t" + (System.nanoTime() - t)); + for (int i = 0; i < ITERATION; i++) { + c.copySerialized(); + } + long t2 = System.nanoTime() - t; + System.out.println("Serialized copy:\t" + t2 + " ns"); + + t = System.nanoTime(); + for (int i = 0; i < ITERATION; i++) { + c.copy(); + } + t2 = System.nanoTime() - t; + System.out.println("Copy:\t" + t2 + " ns"); + + } + + @Test + public void getFieldSpeedTest() { + + final int ITERATION = 10000; + + StreamRecord record = new StreamRecord( + new Tuple4(0, 42L, + "Stratosphere", "Streaming")); + + long t = System.nanoTime(); + for (int i = 0; i < ITERATION; i++) { + record.getField(0, 3); + } + long t2 = System.nanoTime() - t; + System.out.println("getField:\t" + t2 + " ns"); t = System.nanoTime(); - c.copy(); - System.out.println("New copy:\t" + (System.nanoTime() - t)); + for (int i = 0; i < ITERATION; i++) { + record.getFieldFast(0, 3); + } + t2 = System.nanoTime() - t; + System.out.println("getFieldFast:\t" + t2 + " ns"); } @@ -158,16 +194,19 @@ public class StreamRecordTest { int num = 42; String str = "above clouds"; - StreamRecord rec = new StreamRecord(new Tuple2(num, str)); + StreamRecord rec = new StreamRecord(new Tuple2(num, + str)); try { rec.write(out); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray())); + DataInputStream in = new DataInputStream(new ByteArrayInputStream( + buff.toByteArray())); StreamRecord newRec = new StreamRecord(); newRec.read(in); @SuppressWarnings("unchecked") - Tuple2 tupleOut = (Tuple2) newRec.getTuple(0); + Tuple2 tupleOut = (Tuple2) newRec + .getTuple(0); assertEquals(tupleOut.getField(0), 42); } catch (IOException e) { -- GitLab