diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index bbd7dea562cb03436f509e8d1b7f3a1e37f63654..70d60acbf151529c8cbd84dc3db5e186bf65bf3f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -33,7 +33,7 @@ import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.streaming.api.invokable.RecordInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; -import eu.stratosphere.streaming.api.streamrecord.RecordSizeMismatchException; +import eu.stratosphere.streaming.api.streamrecord.TupleSizeMismatchException; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.faulttolerance.AckEvent; import eu.stratosphere.streaming.faulttolerance.AckEventListener; @@ -201,7 +201,7 @@ public final class StreamComponentHelper { threadSafePublish(new AckEvent(id), input); log.debug("ACK: " + id + " -- " + name); // TODO: write an exception class to throw forward - } catch (RecordSizeMismatchException e) { + } catch (TupleSizeMismatchException e) { throw (e); } catch (Exception e) { e.printStackTrace(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchRecordException.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchTupleException.java similarity index 93% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchRecordException.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchTupleException.java index dfcad40c8ba6cafbb31bfa08bc40687d973f7f13..ee8b3672825f569d793fec6a07054f0e643436bd 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchRecordException.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/NoSuchTupleException.java @@ -15,7 +15,7 @@ package eu.stratosphere.streaming.api.streamrecord; -public class NoSuchRecordException extends StreamRecordException { +public class NoSuchTupleException extends StreamRecordException { private static final long serialVersionUID = 4935457355434561574L; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 2399cd08fa93453c69774af8464888cee5e40e7e..be3c322511642cd082ed78d1fe4b8bef9ef12892 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -63,22 +63,24 @@ import eu.stratosphere.types.StringValue; * Object for storing serializable records in batch (single records are * represented batches with one element) used for sending records between task * objects in Stratosphere stream processing. The elements of the batch are - * Value arrays. + * Tuples. */ // TODO: update documentation public class StreamRecord implements IOReadableWritable, Serializable { private static final long serialVersionUID = 1L; - private List recordBatch; - private StringValue uid = new StringValue(""); + private List tupleBatch; + private StringValue uid = new StringValue(); private int numOfFields; - private int numOfRecords; + private int numOfTuples; - private static final Class[] CLASSES = new Class[] { Tuple1.class, Tuple2.class, - Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, - Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, - Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, - Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class }; + private static final Class[] CLASSES = new Class[] { Tuple1.class, + Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, + Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, + Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, + Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, + Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, + Tuple22.class }; // TODO implement equals, clone /** @@ -89,55 +91,53 @@ public class StreamRecord implements IOReadableWritable, Serializable { public StreamRecord(int numOfFields) { this.numOfFields = numOfFields; - this.numOfRecords = 0; - recordBatch = new ArrayList(); + this.numOfTuples = 0; + tupleBatch = new ArrayList(); } /** - * Creates a new empty batch of records and sets the field number to the - * given number, and the number of records to the given number. Setting - * batchSize is just for optimization, records need to be added. + * Creates a new batch of records containing only the given Tuple as element + * and sets desired batch size. * - * @param length - * Number of fields in the records + * @param tuple + * Tuple to be pushed to the record * @param batchSize - * Number of records + * Number of tuples in the record */ public StreamRecord(Tuple tuple, int batchSize) { numOfFields = tuple.getArity(); - numOfRecords = 1; - recordBatch = new ArrayList(batchSize); - recordBatch.add(tuple); + numOfTuples = 1; + tupleBatch = new ArrayList(batchSize); + tupleBatch.add(tuple); } /** - * Given an array of Values, creates a new a record batch containing the - * array as its first element + * Given a Tuple, creates a new a record batch containing the Tuple as its + * only element * - * @param values - * Array containing the Values for the first record in the batch + * @param tuple + * Tuple to be pushed to the record */ public StreamRecord(Tuple tuple) { this(tuple, 1); } /** - * @return Number of fields in the records + * @return Number of fields in the tuples */ public int getNumOfFields() { return numOfFields; } /** - * @return Number of records in the batch + * @return Number of tuples in the batch */ - public int getNumOfRecords() { - return numOfRecords; + public int getNumOfTuples() { + return numOfTuples; } - // TODO: use UUID /** * Set the ID of the StreamRecord object * @@ -145,9 +145,10 @@ public class StreamRecord implements IOReadableWritable, Serializable { * ID of the emitting task * @return The StreamRecord object */ + // TODO: consider sequential ids public StreamRecord setId(String channelID) { UUID uuid = UUID.randomUUID(); - uid.setValue(channelID + "-" + uuid.toString());// rnd.nextInt(10)); + uid.setValue(channelID + "-" + uuid.toString()); return this; } @@ -159,214 +160,390 @@ public class StreamRecord implements IOReadableWritable, Serializable { } /** - * Returns the Value of a field in the given position of a specific record - * in the batch + * Returns the value of a field in the given position of a specific tuple in + * the batch as an object, cast needed to obtain a typed version * - * @param recordNumber - * Position of the record in the batch + * @param tupleNumber + * Position of the tuple in the batch * @param fieldNumber - * Position of the field in the record - * @return Value of the field + * Position of the field in the tuple + * @return value of the field */ - public Object getField(int recordNumber, int fieldNumber) { + public Object getField(int tupleNumber, int fieldNumber) { try { - return recordBatch.get(recordNumber).getField(fieldNumber); + return tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } /** - * Returns the Value of a field in the given position of the first record in - * the batch + * Returns the value of a field in the given position of the first tuple in + * the batch as an object, cast needed to obtain a typed version * * @param fieldNumber - * Position of the field in the record - * @return Value of the field + * Position of the field in the tuple + * @return value of the field */ public Object getField(int fieldNumber) { try { - return recordBatch.get(0).getField(fieldNumber); + return tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } + /** + * Get a String from the given field of the first Tuple of the batch + * + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as String + */ public String getString(int fieldNumber) { try { - return (String) recordBatch.get(0).getField(fieldNumber); + return (String) tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } + /** + * Get an Integer from the given field of the first Tuple of the batch + * + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Integer + */ public Integer getInteger(int fieldNumber) { try { - return (Integer) recordBatch.get(0).getField(fieldNumber); + return (Integer) tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } + /** + * Get a Long from the given field of the first Tuple of the batch + * + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Long + */ public Long getLong(int fieldNumber) { try { - return (Long) recordBatch.get(0).getField(fieldNumber); + return (Long) tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } + /** + * Get a Boolean from the given field of the first Tuple of the batch + * + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Boolean + */ public Boolean getBoolean(int fieldNumber) { try { - return (Boolean) recordBatch.get(0).getField(fieldNumber); + return (Boolean) tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } + /** + * Get a Double from the given field of the first Tuple of the batch + * + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Double + */ public Double getDouble(int fieldNumber) { try { - return (Double) recordBatch.get(0).getField(fieldNumber); + return (Double) tupleBatch.get(0).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } - public String getString(int recordNumber, int fieldNumber) { + /** + * Get a String from the given field of the specified Tuple of the batch + * + * @param tupleNumber + * Position of the tuple in the batch + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as String + */ + public String getString(int tupleNumber, int fieldNumber) { try { - return (String) recordBatch.get(recordNumber).getField(fieldNumber); + return (String) tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } - public Integer getInteger(int recordNumber, int fieldNumber) { + /** + * Get an Integer from the given field of the specified Tuple of the batch + * + * @param tupleNumber + * Position of the tuple in the batch + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Integer + */ + public Integer getInteger(int tupleNumber, int fieldNumber) { try { - return (Integer) recordBatch.get(recordNumber).getField(fieldNumber); + return (Integer) tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } - public Long getLong(int recordNumber, int fieldNumber) { + /** + * Get a Long from the given field of the specified Tuple of the batch + * + * @param tupleNumber + * Position of the tuple in the batch + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Long + */ + public Long getLong(int tupleNumber, int fieldNumber) { try { - return (Long) recordBatch.get(recordNumber).getField(fieldNumber); + return (Long) tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } - public Boolean getBoolean(int recordNumber, int fieldNumber) { + /** + * Get a Boolean from the given field of the specified Tuple of the batch + * + * @param tupleNumber + * Position of the tuple in the batch + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Boolean + */ + public Boolean getBoolean(int tupleNumber, int fieldNumber) { try { - return (Boolean) recordBatch.get(recordNumber).getField(fieldNumber); + return (Boolean) tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } - public Double getDouble(int recordNumber, int fieldNumber) { + /** + * Get a Double from the given field of the specified Tuple of the batch + * + * @param tupleNumber + * Position of the tuple in the batch + * @param fieldNumber + * Position of the field in the tuple + * @return value of the field as Double + */ + public Double getDouble(int tupleNumber, int fieldNumber) { try { - return (Double) recordBatch.get(recordNumber).getField(fieldNumber); + return (Double) tupleBatch.get(tupleNumber).getField(fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } /** - * Sets a field in the given position of a specific record in the batch + * Sets a field in the given position of a specific tuple in the batch * - * @param recordNumber - * Position of record in batch + * @param tupleNumber + * Position of tuple in batch * @param fieldNumber - * Position of field in record - * @param value - * Value to set + * Position of field in tuple + * @param o + * New value */ - public void setField(int recordNumber, int fieldNumber, Object o) { + public void setField(int tupleNumber, int fieldNumber, Object o) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(o, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setString(int recordNumber, int fieldNumber, String o) { + /** + * Sets a String field in the given position of a specific tuple in the + * batch + * + * @param tupleNumber + * Position of tuple in batch + * @param fieldNumber + * Position of field in tuple + * @param str + * New value + */ + public void setString(int tupleNumber, int fieldNumber, String str) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(str, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setInteger(int recordNumber, int fieldNumber, Integer o) { + /** + * Sets an Integer field in the given position of a specific tuple in the + * batch + * + * @param tupleNumber + * Position of tuple in batch + * @param fieldNumber + * Position of field in tuple + * @param i + * New value + */ + public void setInteger(int tupleNumber, int fieldNumber, Integer i) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(i, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setLong(int recordNumber, int fieldNumber, Long o) { + /** + * Sets a Long field in the given position of a specific tuple in the batch + * + * @param tupleNumber + * Position of tuple in batch + * @param fieldNumber + * Position of field in tuple + * @param l + * New value + */ + public void setLong(int tupleNumber, int fieldNumber, Long l) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(l, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setDouble(int recordNumber, int fieldNumber, Double o) { + /** + * Sets a Double field in the given position of a specific tuple in the + * batch + * + * @param tupleNumber + * Position of tuple in batch + * @param fieldNumber + * Position of field in tuple + * @param d + * New value + */ + public void setDouble(int tupleNumber, int fieldNumber, Double tuple) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(tuple, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setBoolean(int recordNumber, int fieldNumber, Boolean o) { + /** + * Sets a Boolean field in the given position of a specific tuple in the + * batch + * + * @param tupleNumber + * Position of tuple in batch + * @param fieldNumber + * Position of field in tuple + * @param b + * New value + */ + public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) { try { - recordBatch.get(recordNumber).setField(o, fieldNumber); + tupleBatch.get(tupleNumber).setField(b, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setString(int fieldNumber, String o) { + /** + * Sets a String field in the given position of the first tuple in the batch + * + * @param fieldNumber + * Position of field in tuple + * @param str + * New value + */ + public void setString(int fieldNumber, String str) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(str, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setInteger(int fieldNumber, Integer o) { + /** + * Sets an Integer field in the given position of the first tuple in the + * batch + * + * @param fieldNumber + * Position of field in tuple + * @param i + * New value + */ + public void setInteger(int fieldNumber, Integer i) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(i, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setLong(int fieldNumber, Long o) { + /** + * Sets a Long field in the given position of the first tuple in the batch + * + * @param fieldNumber + * Position of field in tuple + * @param l + * New value + */ + public void setLong(int fieldNumber, Long l) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(l, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setDouble(int fieldNumber, Double o) { + /** + * Sets a Double field in the given position of the first tuple in the batch + * + * @param fieldNumber + * Position of field in tuple + * @param d + * New value + */ + public void setDouble(int fieldNumber, Double d) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(d, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } - public void setBoolean(int fieldNumber, Boolean o) { + /** + * Sets a Boolean field in the given position of the first tuple in the + * batch + * + * @param fieldNumber + * Position of field in tuple + * @param b + * New value + */ + public void setBoolean(int fieldNumber, Boolean b) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(b, fieldNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } @@ -375,50 +552,51 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @param fieldNumber * Position of the field in the record - * @param value - * Value to set the given field to + * @param o + * New value */ public void setField(int fieldNumber, Object o) { try { - recordBatch.get(0).setField(o, fieldNumber); + tupleBatch.get(0).setField(o, fieldNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchFieldException()); } } /** - * @param recordNumber + * @param tupleNumber * Position of the record in the batch - * @return Value array containing the fields of the record + * @return Chosen tuple */ - public Tuple getRecord(int recordNumber) { + public Tuple getTuple(int tupleNumber) { try { - return recordBatch.get(recordNumber); + return tupleBatch.get(tupleNumber); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } /** - * @return Value array containing the fields of first the record + * @return First tuple of the batch */ - public Tuple getRecord() { - return getRecord(0); + public Tuple getTuple() { + return getTuple(0); } + //TODO: doc from here on public void getTupleInto(Tuple tuple) { if (tuple.getArity() == numOfFields) { try { - Tuple source = recordBatch.get(0); + Tuple source = tupleBatch.get(0); for (int i = 0; i < numOfFields; i++) { tuple.setField(source.getField(i), i); } } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } else { - throw (new RecordSizeMismatchException()); + throw (new TupleSizeMismatchException()); } } @@ -434,12 +612,12 @@ public class StreamRecord implements IOReadableWritable, Serializable { public void setRecord(int recordNumber, Tuple tuple) { if (tuple.getArity() == numOfFields) { try { - recordBatch.set(recordNumber, tuple); + tupleBatch.set(recordNumber, tuple); } catch (IndexOutOfBoundsException e) { - throw (new NoSuchRecordException()); + throw (new NoSuchTupleException()); } } else { - throw (new RecordSizeMismatchException()); + throw (new TupleSizeMismatchException()); } } @@ -451,14 +629,14 @@ public class StreamRecord implements IOReadableWritable, Serializable { */ public void setRecord(Tuple tuple) { if (tuple.getArity() == numOfFields) { - if (numOfRecords != 1) { - recordBatch = new ArrayList(1); - recordBatch.add(tuple); + if (numOfTuples != 1) { + tupleBatch = new ArrayList(1); + tupleBatch.add(tuple); } else { - recordBatch.set(0, tuple); + tupleBatch.set(0, tuple); } } else { - throw (new RecordSizeMismatchException()); + throw (new TupleSizeMismatchException()); } } @@ -471,10 +649,10 @@ public class StreamRecord implements IOReadableWritable, Serializable { */ public void addRecord(Tuple tuple) { if (tuple.getArity() == numOfFields) { - recordBatch.add(tuple); - numOfRecords++; + tupleBatch.add(tuple); + numOfTuples++; } else { - throw new RecordSizeMismatchException(); + throw new TupleSizeMismatchException(); } } @@ -491,7 +669,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { StreamRecord newRecord = new StreamRecord(); try { this.write(out); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray())); + DataInputStream in = new DataInputStream(new ByteArrayInputStream( + buff.toByteArray())); newRecord.read(in); } catch (Exception e) { @@ -526,7 +705,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { basicTypes[i] = tuple.getField(i).getClass(); basicTypeNames.append(basicTypes[i].getName() + ","); } - TypeInformation typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(basicTypes); + TypeInformation typeInfo = TupleTypeInfo + .getBasicTupleTypeInfo(basicTypes); StringValue typeVal = new StringValue(basicTypeNames.toString()); @@ -561,11 +741,13 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - TypeInformation typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(basicTypes); + TypeInformation typeInfo = TupleTypeInfo + .getBasicTupleTypeInfo(basicTypes); TupleSerializer tupleSerializer = (TupleSerializer) typeInfo .createSerializer(); - DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer); + DeserializationDelegate dd = new DeserializationDelegate( + tupleSerializer); dd.setInstance(tupleSerializer.createInstance()); dd.read(in); return dd.getInstance(); @@ -579,9 +761,9 @@ public class StreamRecord implements IOReadableWritable, Serializable { (new IntValue(numOfFields)).write(out); // Write the number of records with an IntValue - (new IntValue(numOfRecords)).write(out); + (new IntValue(numOfTuples)).write(out); - for (Tuple tuple : recordBatch) { + for (Tuple tuple : tupleBatch) { writeTuple(tuple, out); } } @@ -598,20 +780,20 @@ public class StreamRecord implements IOReadableWritable, Serializable { // Get the number of records IntValue numOfRecordsValue = new IntValue(0); numOfRecordsValue.read(in); - numOfRecords = numOfRecordsValue.getValue(); + numOfTuples = numOfRecordsValue.getValue(); // Make sure the fields have numOfFields elements - recordBatch = new ArrayList(); + tupleBatch = new ArrayList(); - for (int k = 0; k < numOfRecords; ++k) { - recordBatch.add(readTuple(in)); + for (int k = 0; k < numOfTuples; ++k) { + tupleBatch.add(readTuple(in)); } } public String toString() { StringBuilder outputString = new StringBuilder("["); - for (Tuple tuple : recordBatch) { + for (Tuple tuple : tupleBatch) { outputString.append(tuple + ","); } outputString.append("]"); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/RecordSizeMismatchException.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/TupleSizeMismatchException.java similarity index 93% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/RecordSizeMismatchException.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/TupleSizeMismatchException.java index 41299a759370a23101e00d670248a32caad43ca5..9b929eca68d00aad5e0a4aca046cb764ed910e7e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/RecordSizeMismatchException.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/TupleSizeMismatchException.java @@ -13,7 +13,7 @@ package eu.stratosphere.streaming.api.streamrecord; -public class RecordSizeMismatchException extends StreamRecordException { +public class TupleSizeMismatchException extends StreamRecordException { /** * Serial version UID for serialization interoperability. diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java index 75c6647b51f486a84000b6d0df310f8fdebbfeb8..7988607991a81a70d39afb9d973fabe465a21ded 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java @@ -31,7 +31,7 @@ public class BatchWordCountSplitter extends UserTaskInvokable { @Override public void invoke(StreamRecord record) throws Exception { - int numberOfRecords = record.getNumOfRecords(); + int numberOfRecords = record.getNumOfTuples(); for (int i = 0; i < numberOfRecords; ++i) { words = record.getString(0).split(" "); timestamp=record.getLong(1); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index c0eb1f831cf78cdd22a959b377b0a47e69db4d0e..3b2681a0ea2322bc32785df53349bf12bf0e6b17 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -38,7 +38,7 @@ public class StreamRecordTest { StreamRecord record = new StreamRecord(new Tuple2("Stratosphere", 1)); assertEquals(2, record.getNumOfFields()); - assertEquals(1, record.getNumOfRecords()); + assertEquals(1, record.getNumOfTuples()); assertEquals("Stratosphere", record.getString(0)); assertEquals((Integer) 1, record.getInteger(1)); @@ -47,17 +47,17 @@ public class StreamRecordTest { record.setRecord(new Tuple2("Big Data looks tiny from here.", 2L)); assertEquals(2, record.getNumOfFields()); - assertEquals(1, record.getNumOfRecords()); + assertEquals(1, record.getNumOfTuples()); assertEquals((Long) 2L, record.getLong(1)); record.setRecord(new Tuple2("Big Data looks tiny from here.", true)); assertEquals(2, record.getNumOfFields()); - assertEquals(1, record.getNumOfRecords()); + assertEquals(1, record.getNumOfTuples()); assertEquals(true, record.getBoolean(1)); record.setRecord(new Tuple2("Big Data looks tiny from here.", 2.5)); assertEquals(2, record.getNumOfFields()); - assertEquals(1, record.getNumOfRecords()); + assertEquals(1, record.getNumOfTuples()); assertEquals((Double) 2.5, record.getDouble(1)); Tuple2 tuple = new Tuple2(); @@ -80,11 +80,11 @@ public class StreamRecordTest { try { record.addRecord(new Tuple1("4")); fail(); - } catch (RecordSizeMismatchException e) { + } catch (TupleSizeMismatchException e) { } assertEquals(2, record.getNumOfFields()); - assertEquals(2, record.getNumOfRecords()); + assertEquals(2, record.getNumOfTuples()); assertEquals((Integer) 1, record.getInteger(0, 0)); assertEquals((Integer) 2, record.getInteger(1, 1)); @@ -92,7 +92,7 @@ public class StreamRecordTest { assertEquals(-1, record.getField(1, 0)); assertEquals(2, record.getNumOfFields()); - assertEquals(2, record.getNumOfRecords()); + assertEquals(2, record.getNumOfTuples()); } @Test @@ -115,20 +115,20 @@ public class StreamRecordTest { try { a.setRecord(4, new Tuple1("Data")); fail(); - } catch (NoSuchRecordException e) { + } catch (NoSuchTupleException e) { } try { a.setRecord(new Tuple2("Data", "Stratosphere")); fail(); - } catch (RecordSizeMismatchException e) { + } catch (TupleSizeMismatchException e) { } StreamRecord b = new StreamRecord(); try { b.addRecord(new Tuple2("Data", "Stratosphere")); fail(); - } catch (RecordSizeMismatchException e) { + } catch (TupleSizeMismatchException e) { } try { @@ -153,7 +153,7 @@ public class StreamRecordTest { StreamRecord newRec = new StreamRecord(); newRec.read(in); - Tuple2 tupleOut = (Tuple2) newRec.getRecord(0); + Tuple2 tupleOut = (Tuple2) newRec.getTuple(0); assertEquals(tupleOut.getField(0), 42); } catch (IOException e) {