diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index bfc12c32ea058ff7b1e5a84557d194dcc454eb25..582ddd7a960f55df0546bb0323b0167ed3a69062 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -5,14 +5,13 @@
4.0.0
eu.stratosphere
- 0.2-SNAPSHOT
+ 0.5
stratosphere-streaming
stratosphere-streaming
jar
- 0.5
UTF-8
UTF-8
@@ -30,32 +29,32 @@
eu.stratosphere
stratosphere-core
- ${stratosphere.version}
+ ${project.version}
eu.stratosphere
stratosphere-tests
- ${stratosphere.version}
+ ${project.version}
eu.stratosphere
stratosphere-compiler
- ${stratosphere.version}
+ ${project.version}
eu.stratosphere
stratosphere-runtime
- ${stratosphere.version}
+ ${project.version}
eu.stratosphere
stratosphere-clients
- ${stratosphere.version}
+ ${project.version}
eu.stratosphere
stratosphere-java
- ${stratosphere.version}
+ ${project.version}
junit
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java
new file mode 100644
index 0000000000000000000000000000000000000000..73182558f347f2be0937a2bc6a8ee21718477464
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/DataStream.java
@@ -0,0 +1,29 @@
+package eu.stratosphere.api.datastream;
+
+import eu.stratosphere.types.TypeInformation;
+
+public class DataStream {
+
+ private final StreamExecutionEnvironment context;
+
+ private final TypeInformation type;
+
+ protected DataStream(StreamExecutionEnvironment context, TypeInformation type) {
+ if (context == null) {
+ throw new NullPointerException("context is null");
+ }
+
+ if (type == null) {
+ throw new NullPointerException("type is null");
+ }
+
+ this.context = context;
+ this.type = type;
+ }
+
+ public TypeInformation getType() {
+ return this.type;
+ }
+
+
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SingleStreamInputOperator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SingleStreamInputOperator.java
new file mode 100644
index 0000000000000000000000000000000000000000..456a167cf3560c1693b70f142ec119f053a61d71
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/SingleStreamInputOperator.java
@@ -0,0 +1,12 @@
+package eu.stratosphere.api.datastream;
+
+import eu.stratosphere.types.TypeInformation;
+
+public abstract class SingleStreamInputOperator> extends StreamOperator {
+
+ protected SingleStreamInputOperator(StreamExecutionEnvironment context,
+ TypeInformation type) {
+ super(context, type);
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java
new file mode 100644
index 0000000000000000000000000000000000000000..10afc7ceb413b17ff18ab641a802da0073fe615d
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamExecutionEnvironment.java
@@ -0,0 +1,21 @@
+package eu.stratosphere.api.datastream;
+
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.streaming.util.ClusterUtil;
+
+public class StreamExecutionEnvironment {
+ JobGraphBuilder jobGraphBuilder;
+
+ public StreamExecutionEnvironment() {
+ jobGraphBuilder = new JobGraphBuilder("jobGraph",FaultToleranceType.NONE);
+ }
+
+// public static StreamExecutionEnvironment getLocalEnvironment() {
+// return new StreamExecutionEnvironment();
+// }
+
+ public void execute() {
+ ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
+ }
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamOperator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamOperator.java
new file mode 100644
index 0000000000000000000000000000000000000000..e4a637e4ea99deb31edfa8dfba65da7ffad5437f
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/StreamOperator.java
@@ -0,0 +1,10 @@
+package eu.stratosphere.api.datastream;
+
+import eu.stratosphere.types.TypeInformation;
+
+public abstract class StreamOperator> extends DataStream {
+
+ protected StreamOperator(StreamExecutionEnvironment context, TypeInformation type) {
+ super(context, type);
+ }
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java
new file mode 100755
index 0000000000000000000000000000000000000000..18234ed78a94c71288381b6dc500d34e56373c63
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java
@@ -0,0 +1,42 @@
+package eu.stratosphere.streaming.api;
+
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.util.Collector;
+
+public class StreamCollector implements Collector {
+
+ protected ArrayStreamRecord streamRecord;
+ protected int batchSize;
+ protected int counter = 0;
+ protected int channelID;
+
+ public StreamCollector(int batchSize, int channelID) {
+ this.batchSize = batchSize;
+ this.streamRecord = new ArrayStreamRecord(batchSize);
+ this.channelID = channelID;
+ }
+
+ @Override
+ public void collect(Tuple tuple) {
+ streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
+ counter++;
+ if (counter >= batchSize) {
+ counter = 0;
+ streamRecord.setId(channelID);
+ emit(streamRecord);
+ }
+ }
+
+ private void emit(ArrayStreamRecord streamRecord) {
+ System.out.println(streamRecord);
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java
new file mode 100755
index 0000000000000000000000000000000000000000..eaa0d4ea7d203a519b72307d0aadac64854d28a5
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java
@@ -0,0 +1,306 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.api.streamrecord;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple10;
+import eu.stratosphere.api.java.tuple.Tuple11;
+import eu.stratosphere.api.java.tuple.Tuple12;
+import eu.stratosphere.api.java.tuple.Tuple13;
+import eu.stratosphere.api.java.tuple.Tuple14;
+import eu.stratosphere.api.java.tuple.Tuple15;
+import eu.stratosphere.api.java.tuple.Tuple16;
+import eu.stratosphere.api.java.tuple.Tuple17;
+import eu.stratosphere.api.java.tuple.Tuple18;
+import eu.stratosphere.api.java.tuple.Tuple19;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple20;
+import eu.stratosphere.api.java.tuple.Tuple21;
+import eu.stratosphere.api.java.tuple.Tuple22;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.tuple.Tuple6;
+import eu.stratosphere.api.java.tuple.Tuple7;
+import eu.stratosphere.api.java.tuple.Tuple8;
+import eu.stratosphere.api.java.tuple.Tuple9;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
+import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
+
+/**
+ * Object for storing serializable records in batch (single records are
+ * represented batches with one element) used for sending records between task
+ * objects in Stratosphere stream processing. The elements of the batch are
+ * Tuples.
+ */
+public class ArrayStreamRecord implements IOReadableWritable, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private Tuple[] tupleBatch;
+ private UID uid = new UID();
+ private int batchSize;
+
+ private TupleSerializer tupleSerializer;
+ SerializationDelegate serializationDelegate;
+
+ private static final Class>[] CLASSES = new Class>[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
+
+ /**
+ * Creates a new empty instance for read
+ */
+ public ArrayStreamRecord() {
+ }
+
+ public ArrayStreamRecord(int batchsize) {
+ this.batchSize = batchsize;
+ tupleBatch = new Tuple[batchsize];
+ }
+
+ public ArrayStreamRecord(ArrayStreamRecord record) {
+ tupleBatch = new Tuple[record.batchSize];
+ this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
+ for (int i = 0; i < record.getBatchSize(); ++i) {
+ this.tupleBatch[i] = copyTuple(record.getTuple(i));
+ }
+ this.batchSize = tupleBatch.length;
+ }
+
+ /**
+ * Creates a new batch of records containing the given Tuple array as
+ * elements
+ *
+ * @param tupleList
+ * Tuples to bes stored in the StreamRecord
+ */
+ public ArrayStreamRecord(Tuple[] tupleArray) {
+ this.batchSize = tupleArray.length;
+ tupleBatch = tupleArray;
+ }
+
+ public void setTupleTypeInfo(TupleTypeInfo typeInfo) {
+ tupleSerializer = (TupleSerializer) typeInfo.createSerializer();
+ serializationDelegate = new SerializationDelegate(tupleSerializer);
+ }
+
+ /**
+ * @return Number of tuples in the batch
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * @return The ID of the object
+ */
+ public UID getId() {
+ return uid;
+ }
+
+ /**
+ * Set the ID of the StreamRecord object
+ *
+ * @param channelID
+ * ID of the emitting task
+ * @return The StreamRecord object
+ */
+ public ArrayStreamRecord setId(int channelID) {
+ uid = new UID(channelID);
+ return this;
+ }
+
+ /**
+ * Returns an iterable over the tuplebatch
+ *
+ * @return batch iterable
+ */
+ public Iterable getBatchIterable() {
+ return (Iterable) Arrays.asList(tupleBatch);
+ }
+
+ /**
+ * @param tupleNumber
+ * Position of the record in the batch
+ * @return Chosen tuple
+ * @throws NoSuchTupleException
+ * the Tuple does not have this many fields
+ */
+ public Tuple getTuple(int tupleNumber) throws NoSuchTupleException {
+ try {
+ return tupleBatch[tupleNumber];
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ }
+
+ /**
+ * Sets a tuple at the given position in the batch with the given tuple
+ *
+ * @param tupleNumber
+ * Position of tuple in the batch
+ * @param tuple
+ * Value to set
+ * @throws NoSuchTupleException
+ * , TupleSizeMismatchException
+ */
+ public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
+ try {
+ tupleBatch[tupleNumber] = tuple;
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+
+ }
+
+ /**
+ * Creates a deep copy of the StreamRecord
+ *
+ * @return Copy of the StreamRecord
+ *
+ */
+ public ArrayStreamRecord copy() {
+ ArrayStreamRecord newRecord = new ArrayStreamRecord(batchSize);
+
+ newRecord.uid = new UID(Arrays.copyOf(uid.getId(), 20));
+
+ for (int i = 0; i < batchSize; i++) {
+ newRecord.tupleBatch[i] = copyTuple(tupleBatch[i]);
+ }
+
+ return newRecord;
+ }
+
+ /**
+ * Creates deep copy of Tuple
+ *
+ * @param tuple
+ * Tuple to copy
+ * @return Copy of the tuple
+ */
+ public static Tuple copyTuple(Tuple tuple) {
+ // TODO: implement deep copy for arrays
+ int numofFields = tuple.getArity();
+ Tuple newTuple = null;
+ try {
+ newTuple = (Tuple) CLASSES[numofFields - 1].newInstance();
+
+ for (int i = 0; i < numofFields; i++) {
+ Class extends Object> type = tuple.getField(i).getClass();
+ if (type.isArray()) {
+ if (type.equals(Boolean[].class)) {
+ Boolean[] arr = (Boolean[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Byte[].class)) {
+ Byte[] arr = (Byte[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Character[].class)) {
+ Character[] arr = (Character[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Double[].class)) {
+ Double[] arr = (Double[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Float[].class)) {
+ Float[] arr = (Float[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Integer[].class)) {
+ Integer[] arr = (Integer[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Long[].class)) {
+ Long[] arr = (Long[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Short[].class)) {
+ Short[] arr = (Short[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(String[].class)) {
+ String[] arr = (String[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ }
+ newTuple.setField(tuple.getField(i), i);
+ } else {
+ newTuple.setField(tuple.getField(i), i);
+ }
+ }
+ } catch (InstantiationException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return newTuple;
+ }
+
+ /**
+ * Creates a String representation as a list of tuples
+ */
+ public String toString() {
+ StringBuilder outputString = new StringBuilder("[");
+
+ String prefix = "";
+
+ for (Tuple tuple : tupleBatch) {
+ outputString.append(prefix);
+ prefix = ",";
+ outputString.append(tuple.toString());
+ }
+ outputString.append("]");
+ return outputString.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+
+ uid.write(out);
+ out.writeInt(batchSize);
+
+ for (Tuple tuple : tupleBatch) {
+ serializationDelegate.setInstance(tuple);
+ serializationDelegate.write(out);
+ }
+ }
+
+ /**
+ * Read method definition for the IOReadableWritable interface
+ */
+ @Override
+ public void read(DataInput in) throws IOException {
+ uid = new UID();
+ uid.read(in);
+ batchSize = in.readInt();
+ tupleBatch = new Tuple[batchSize];
+ DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer);
+
+ for (int k = 0; k < batchSize; ++k) {
+ dd.setInstance(tupleSerializer.createInstance());
+ dd.read(in);
+ tupleBatch[k] = dd.getInstance();
+ }
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
index e31e440b75c8f65ffeb87fb4e4fe805cb1cf5091..58946831a5edd2b9fea297935ddf133a54745708 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
@@ -15,12 +15,8 @@
package eu.stratosphere.streaming.api.streamrecord;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -50,10 +46,7 @@ import eu.stratosphere.api.java.tuple.Tuple6;
import eu.stratosphere.api.java.tuple.Tuple7;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9;
-import eu.stratosphere.api.java.typeutils.BasicArrayTypeInfo;
-import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
-import eu.stratosphere.types.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
@@ -70,10 +63,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private List tupleBatch;
private UID uid = new UID();
- private int numOfFields;
- private int numOfTuples;
private int batchSize;
+ private TupleTypeInfo typeInfo;
+ private TupleSerializer tupleSerializer;
+ SerializationDelegate serializationDelegate;
+
private static final Class>[] CLASSES = new Class>[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
@@ -86,59 +81,19 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
- /**
- * Creates empty StreamRecord with number of fields set
- *
- * @param numOfFields
- * number of fields
- */
- public StreamRecord(int numOfFields) {
- this.numOfFields = numOfFields;
- this.numOfTuples = 0;
- this.batchSize = 1;
- tupleBatch = new ArrayList(batchSize);
- }
-
- /**
- * Creates empty StreamRecord with number of fields and batch size set
- *
- * @param numOfFields
- * Number of fields in the tuples
- * @param batchSize
- * Batch size
- */
- public StreamRecord(int numOfFields, int batchSize) {
- this.numOfFields = numOfFields;
- this.numOfTuples = 0;
- this.batchSize = batchSize;
- tupleBatch = new ArrayList(batchSize);
+ public StreamRecord(int batchsize) {
+ this.batchSize = batchsize;
+ tupleBatch = new ArrayList(batchsize);
+ initRecords();
}
public StreamRecord(StreamRecord record) {
- this.numOfFields = record.getNumOfFields();
- this.numOfTuples = 0;
tupleBatch = new ArrayList();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
- for (int i = 0; i < record.getNumOfTuples(); ++i) {
+ for (int i = 0; i < record.getBatchSize(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
- }
-
- /**
- * Creates a new batch of records containing only the given Tuple as element
- * and sets desired batch size.
- *
- * @param tuple
- * Tuple to be pushed to the record
- * @param batchSize
- * Number of tuples in the record
- */
- public StreamRecord(Tuple tuple, int batchSize) {
- numOfFields = tuple.getArity();
- numOfTuples = 1;
- this.batchSize = batchSize;
- tupleBatch = new ArrayList(batchSize);
- tupleBatch.add(tuple);
+ this.batchSize = tupleBatch.size();
}
/**
@@ -149,52 +104,21 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Tuples to bes stored in the StreamRecord
*/
public StreamRecord(List tupleList) {
- numOfFields = tupleList.get(0).getArity();
- numOfTuples = tupleList.size();
- this.batchSize = numOfTuples;
+ this.batchSize = tupleList.size();
tupleBatch = new ArrayList(tupleList);
}
- /**
- * Given a Tuple, creates a new a record batch containing the Tuple as its
- * only element
- *
- * @param tuple
- * Tuple to be pushed to the record
- */
- public StreamRecord(Tuple tuple) {
- this(tuple, 1);
- }
-
- /**
- * Checks whether the record batch is empty
- *
- * @return true if the batch is empty, false if it contains Tuples
- */
- public boolean isEmpty() {
- return (this.numOfTuples == 0);
- }
-
- /**
- * Remove all the contents inside StreamRecord.
- */
- public void Clear() {
- this.numOfTuples = 0;
- tupleBatch.clear();
- }
-
- /**
- * @return Number of fields in the tuples
- */
- public int getNumOfFields() {
- return numOfFields;
+ public void setTupleTypeInfo(TupleTypeInfo typeInfo) {
+ this.typeInfo = typeInfo;
+ tupleSerializer = (TupleSerializer) typeInfo.createSerializer();
+ serializationDelegate = new SerializationDelegate(tupleSerializer);
}
/**
* @return Number of tuples in the batch
*/
- public int getNumOfTuples() {
- return numOfTuples;
+ public int getBatchSize() {
+ return batchSize;
}
/**
@@ -224,7 +148,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
for (int i = 0; i < batchSize; i++) {
tupleBatch.add(null);
}
- numOfTuples = batchSize;
}
/**
@@ -236,668 +159,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return (Iterable) tupleBatch;
}
- /**
- * Returns the value of a field in the given position of the first tuple in
- * the batch as an object, cast needed to obtain a typed version
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Object getField(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getField(0, fieldNumber);
- }
-
- /**
- * Returns the value of a field in the given position of a specific tuple in
- * the batch as an object, cast needed to obtain a typed version
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- Tuple tuple;
- try {
- tuple = tupleBatch.get(tupleNumber);
- } catch (IndexOutOfBoundsException e) {
- throw (new NoSuchTupleException());
- }
- try {
- return tuple.getField(fieldNumber);
- } catch (IndexOutOfBoundsException e) {
- throw (new NoSuchFieldException());
- }
- }
-
- /**
- * Get a Boolean from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Boolean
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Boolean getBoolean(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getBoolean(0, fieldNumber);
- }
-
- /**
- * Get a Boolean from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Boolean
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- // TODO: add exception for cast for all getters
- public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Boolean) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Byte from thne given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Byte
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Byte getByte(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getByte(0, fieldNumber);
- }
-
- /**
- * Get a Byte from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Byte
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Byte) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Character from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Character
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Character getCharacter(int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- return getCharacter(0, fieldNumber);
- }
-
- /**
- * Get a Character from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Character
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Character) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Double from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Double
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Double getDouble(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getDouble(0, fieldNumber);
- }
-
- /**
- * Get a Double from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Double
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Double) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Float from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Float
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Float getFloat(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getFloat(0, fieldNumber);
- }
-
- /**
- * Get a Float from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Float
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Float) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get an Integer from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Integer
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Integer getInteger(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getInteger(0, fieldNumber);
- }
-
- /**
- * Get an Integer from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Integer
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Integer) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Long from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Long
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Long getLong(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getLong(0, fieldNumber);
- }
-
- /**
- * Get a Long from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Long
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Long) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a Short from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Short
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Short getShort(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getShort(0, fieldNumber);
- }
-
- /**
- * Get a Short from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as Short
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (Short) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Get a String from the given field of the first Tuple of the batch
- *
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as String
- * @throws NoSuchTupleException
- * , NoSuchFieldException
- */
- public String getString(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
- return getString(0, fieldNumber);
- }
-
- /**
- * Get a String from the given field of the specified Tuple of the batch
- *
- * @param tupleNumber
- * Position of the tuple in the batch
- * @param fieldNumber
- * Position of the field in the tuple
- * @return value of the field as String
- */
- public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
- NoSuchFieldException {
- try {
- return (String) getField(tupleNumber, fieldNumber);
- } catch (ClassCastException e) {
- throw new FieldTypeMismatchException();
- }
- }
-
- /**
- * Sets a field in the given position of the first record in the batch
- *
- * @param fieldNumber
- * Position of the field in the record
- * @param o
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setField(int fieldNumber, Object o) throws NoSuchFieldException {
- setField(0, fieldNumber, o);
- }
-
- /**
- * Sets a field in the given position of a specific tuple in the batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param o
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- // TODO: consider interaction with batch size
- public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
- try {
- tupleBatch.get(tupleNumber).setField(o, fieldNumber);
- } catch (IndexOutOfBoundsException e) {
- throw (new NoSuchTupleException());
- }
- }
-
- /**
- * Sets a Boolean field in the given position of the first tuple in the
- * batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param b
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setBoolean(int fieldNumber, Boolean b) throws NoSuchFieldException {
- setBoolean(0, fieldNumber, b);
- }
-
- /**
- * Sets a Boolean field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param b
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, b);
- }
-
- /**
- * Sets a Byte field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param b
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setByte(int fieldNumber, Byte b) throws NoSuchFieldException {
- setByte(0, fieldNumber, b);
- }
-
- /**
- * Sets a Byte field in the given position of a specific tuple in the batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param b
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setByte(int tupleNumber, int fieldNumber, Byte b) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, b);
- }
-
- /**
- * Sets a Character field in the given position of the first tuple in the
- * batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param c
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setCharacter(int fieldNumber, Character c) throws NoSuchFieldException {
- setCharacter(0, fieldNumber, c);
- }
-
- /**
- * Sets a Character field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param c
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setCharacter(int tupleNumber, int fieldNumber, Character c)
- throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, c);
- }
-
- /**
- * Sets a Double field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param d
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setDouble(int fieldNumber, Double d) throws NoSuchFieldException {
- setDouble(0, fieldNumber, d);
- }
-
- /**
- * Sets a Double field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param d
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setDouble(int tupleNumber, int fieldNumber, Double d) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, d);
- }
-
- /**
- * Sets a Float field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param f
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setFloat(int fieldNumber, Float f) throws NoSuchFieldException {
- setFloat(0, fieldNumber, f);
- }
-
- /**
- * Sets a Double field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param f
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setFloat(int tupleNumber, int fieldNumber, Float f) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, f);
- }
-
- /**
- * Sets an Integer field in the given position of the first tuple in the
- * batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param i
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException {
- setInteger(0, fieldNumber, i);
- }
-
- /**
- * Sets an Integer field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param i
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setInteger(int tupleNumber, int fieldNumber, Integer i) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, i);
- }
-
- /**
- * Sets a Long field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param l
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setLong(int fieldNumber, Long l) throws NoSuchFieldException {
- setLong(0, fieldNumber, l);
- }
-
- /**
- * Sets a Long field in the given position of a specific tuple in the batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param l
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setLong(int tupleNumber, int fieldNumber, Long l) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, l);
- }
-
- /**
- * Sets a Short field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param s
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setShort(int fieldNumber, Short s) throws NoSuchFieldException {
- setShort(0, fieldNumber, s);
- }
-
- /**
- * Sets a Short field in the given position of a specific tuple in the batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param s
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setShort(int tupleNumber, int fieldNumber, Short s) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, s);
- }
-
- /**
- * Sets a String field in the given position of the first tuple in the batch
- *
- * @param fieldNumber
- * Position of field in tuple
- * @param str
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setString(int fieldNumber, String str) throws NoSuchFieldException {
- setField(0, fieldNumber, str);
- }
-
- /**
- * Sets a String field in the given position of a specific tuple in the
- * batch
- *
- * @param tupleNumber
- * Position of tuple in batch
- * @param fieldNumber
- * Position of field in tuple
- * @param str
- * New value
- * @throws NoSuchFieldException
- * the Tuple does not have this many fields
- */
- public void setString(int tupleNumber, int fieldNumber, String str) throws NoSuchFieldException {
- setField(tupleNumber, fieldNumber, str);
- }
-
- /**
- * @return First tuple of the batch
- * @throws NoSuchTupleException
- * the StreamRecord does not have this many tuples
- */
- public Tuple getTuple() throws NoSuchTupleException {
- return getTuple(0);
- }
-
/**
* @param tupleNumber
* Position of the record in the batch
@@ -913,60 +174,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
- /**
- * Gets the fields of the first tuple of the batch into the parameter tuple
- *
- * @param tuple
- * Target tuple
- * @throws NoSuchTupleException
- * , TupleSizeMismatchException
- */
- public void getTupleInto(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
- getTupleInto(0, tuple);
- }
-
- /**
- * Gets the fields of the specified tuple of the batch into the parameter
- * tuple
- *
- * @param tupleNumber
- * Position of the tuple to be written out
- *
- * @param tuple
- * Target tuple
- * @throws NoSuchTupleException
- * , TupleSizeMismatchException
- */
- public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
- TupleSizeMismatchException {
-
- if (tuple.getArity() == numOfFields) {
- try {
- Tuple source = tupleBatch.get(tupleNumber);
- for (int i = 0; i < numOfFields; i++) {
- tuple.setField(source.getField(i), i);
- }
- } catch (IndexOutOfBoundsException e) {
- throw (new NoSuchTupleException());
- }
- } else {
- throw (new TupleSizeMismatchException());
- }
-
- }
-
- /**
- * Sets the first tuple in the batch with the given tuple
- *
- * @param tuple
- * Tuple to set
- * @throws NoSuchTupleException
- * , TupleSizeMismatchException
- */
- public void setTuple(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
- setTuple(0, tuple);
- }
-
/**
* Sets a tuple at the given position in the batch with the given tuple
*
@@ -977,87 +184,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
- public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
- TupleSizeMismatchException {
- if (tuple.getArity() == numOfFields) {
- try {
- tupleBatch.set(tupleNumber, tuple);
- } catch (IndexOutOfBoundsException e) {
- throw (new NoSuchTupleException());
- }
- } else {
- throw (new TupleSizeMismatchException());
- }
- }
-
- /**
- * Checks if the number of fields are equal to the batch field size then
- * adds the Tuple to the end of the batch
- *
- * @param tuple
- * Tuple to be added as the next record of the batch
- * @throws TupleSizeMismatchException
- * Tuple specified has illegal size
- */
- public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
- addTuple(numOfTuples, tuple);
- }
-
- /**
- * Checks if the number of fields are equal to the batch field size then
- * inserts the Tuple to the given position into the recordbatch
- *
- * @param index
- * Position of the added tuple
- * @param tuple
- * Tuple to be added as the next record of the batch
- * @throws TupleSizeMismatchException
- * Tuple specified has illegal size
- */
- public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
- if (tuple.getArity() == numOfFields) {
- tupleBatch.add(index, tuple);
- numOfTuples++;
- } else {
- throw new TupleSizeMismatchException();
- }
- }
-
- /**
- * Removes the tuple at the given position from the batch and returns it
- *
- * @param index
- * Index of tuple to remove
- * @return Removed tuple
- * @throws TupleSizeMismatchException
- * Tuple specified has illegal size
- */
- public Tuple removeTuple(int index) throws TupleSizeMismatchException {
- if (index < numOfTuples) {
- numOfTuples--;
- return tupleBatch.remove(index);
- } else {
- throw new TupleSizeMismatchException();
+ public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
+ try {
+ tupleBatch.set(tupleNumber, tuple);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
}
- }
-
- /**
- * Creates a copy of the StreamRecord object by Serializing and
- * deserializing it
- *
- * @return copy of the StreamRecord
- * @throws IOException
- * Write or read failed
- */
- public StreamRecord copySerialized() throws IOException {
- ByteArrayOutputStream buff = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(buff);
- StreamRecord newRecord = new StreamRecord();
- this.write(out);
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
- newRecord.read(in);
- return newRecord;
}
/**
@@ -1067,7 +200,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
*/
public StreamRecord copy() {
- StreamRecord newRecord = new StreamRecord(numOfFields, numOfTuples);
+ StreamRecord newRecord = new StreamRecord(batchSize);
newRecord.uid = new UID(Arrays.copyOf(uid.getId(), 20));
@@ -1139,162 +272,27 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
- * copy tuples from the given record and append them to the end.
- *
- * @param record
- * record to be appended
+ * Creates a String representation as a list of tuples
*/
- public void appendRecord(StreamRecord record) {
- for (int i = 0; i < record.getNumOfTuples(); ++i) {
- this.addTuple(record.getTuple(i));
- }
- }
+ public String toString() {
+ StringBuilder outputString = new StringBuilder("[");
- /**
- * Converts tuple field types to a byte array
- *
- * @param tuple
- * @return byte array representing types
- */
- byte[] tupleTypesToByteArray(Tuple tuple) {
- byte[] typeNums = new byte[numOfFields];
- for (int i = 0; i < typeNums.length; i++) {
- Class extends Object> type = tuple.getField(i).getClass();
- if (type.equals(Boolean.class)) {
- typeNums[i] = 0;
- } else if (type.equals(Byte.class)) {
- typeNums[i] = 1;
- } else if (type.equals(Character.class)) {
- typeNums[i] = 2;
- } else if (type.equals(Double.class)) {
- typeNums[i] = 3;
- } else if (type.equals(Float.class)) {
- typeNums[i] = 4;
- } else if (type.equals(Integer.class)) {
- typeNums[i] = 5;
- } else if (type.equals(Long.class)) {
- typeNums[i] = 6;
- } else if (type.equals(Short.class)) {
- typeNums[i] = 7;
- } else if (type.equals(String.class)) {
- typeNums[i] = 8;
- } else if (type.equals(Boolean[].class)) {
- typeNums[i] = 9;
- } else if (type.equals(Byte[].class)) {
- typeNums[i] = 10;
- } else if (type.equals(Character[].class)) {
- typeNums[i] = 11;
- } else if (type.equals(Double[].class)) {
- typeNums[i] = 12;
- } else if (type.equals(Float[].class)) {
- typeNums[i] = 13;
- } else if (type.equals(Integer[].class)) {
- typeNums[i] = 14;
- } else if (type.equals(Long[].class)) {
- typeNums[i] = 15;
- } else if (type.equals(Short[].class)) {
- typeNums[i] = 16;
- } else if (type.equals(String[].class)) {
- typeNums[i] = 17;
- }
- }
- return typeNums;
- }
+ String prefix = "";
- /**
- * Gets tuple field types from a byte array
- *
- * @param byte array representing types
- * @param numberOfFields
- * @return TypeInfo array of field types
- */
- @SuppressWarnings("rawtypes")
- TypeInformation[] tupleTypesFromByteArray(byte[] representation) {
- TypeInformation[] basicTypes = new TypeInformation[representation.length];
- for (int i = 0; i < basicTypes.length; i++) {
- switch (representation[i]) {
- case 0:
- basicTypes[i] = BasicTypeInfo.BOOLEAN_TYPE_INFO;
- break;
- case 1:
- basicTypes[i] = BasicTypeInfo.BYTE_TYPE_INFO;
- break;
- case 2:
- basicTypes[i] = BasicTypeInfo.CHAR_TYPE_INFO;
- break;
- case 3:
- basicTypes[i] = BasicTypeInfo.DOUBLE_TYPE_INFO;
- break;
- case 4:
- basicTypes[i] = BasicTypeInfo.FLOAT_TYPE_INFO;
- break;
- case 5:
- basicTypes[i] = BasicTypeInfo.INT_TYPE_INFO;
- break;
- case 6:
- basicTypes[i] = BasicTypeInfo.LONG_TYPE_INFO;
- break;
- case 7:
- basicTypes[i] = BasicTypeInfo.SHORT_TYPE_INFO;
- break;
- case 8:
- basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
- break;
- case 9:
- basicTypes[i] = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
- break;
- case 10:
- basicTypes[i] = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
- break;
- case 11:
- basicTypes[i] = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
- break;
- case 12:
- basicTypes[i] = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
- break;
- case 13:
- basicTypes[i] = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
- break;
- case 14:
- basicTypes[i] = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
- break;
- case 15:
- basicTypes[i] = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
- break;
- case 16:
- basicTypes[i] = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
- break;
- case 17:
- basicTypes[i] = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
- break;
- default:
- basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
- break;
- }
+ for (Tuple tuple : tupleBatch) {
+ outputString.append(prefix);
+ prefix = ",";
+ outputString.append(tuple.toString());
}
- return basicTypes;
+ outputString.append("]");
+ return outputString.toString();
}
- /**
- * Write method definition for the IOReadableWritable interface
- */
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
-
- out.writeByte(numOfFields);
- out.writeInt(numOfTuples);
-
- byte[] typesInByte = tupleTypesToByteArray(getTuple());
- out.write(typesInByte);
-
- TupleTypeInfo typeInfo = new TupleTypeInfo(
- tupleTypesFromByteArray(typesInByte));
- TupleSerializer tupleSerializer = (TupleSerializer) typeInfo
- .createSerializer();
- SerializationDelegate serializationDelegate = new SerializationDelegate(
- tupleSerializer);
+ out.writeInt(batchSize);
for (Tuple tuple : tupleBatch) {
serializationDelegate.setInstance(tuple);
@@ -1309,43 +307,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public void read(DataInput in) throws IOException {
uid = new UID();
uid.read(in);
-
- numOfFields = in.readByte();
- numOfTuples = in.readInt();
-
- tupleBatch = new ArrayList(numOfTuples);
-
- byte[] typesInByte = new byte[numOfFields];
- in.readFully(typesInByte, 0, numOfFields);
-
- TupleTypeInfo typeInfo = new TupleTypeInfo(
- tupleTypesFromByteArray(typesInByte));
- TupleSerializer tupleSerializer = typeInfo.createSerializer();
+ batchSize = in.readInt();
+ tupleBatch = new ArrayList(batchSize);
DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer);
- for (int k = 0; k < numOfTuples; ++k) {
+ for (int k = 0; k < batchSize; ++k) {
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
- tupleBatch.add(dd.getInstance());
+ tupleBatch.set(k, dd.getInstance());
}
}
- /**
- * Creates a String representation as a list of tuples
- */
- public String toString() {
- StringBuilder outputString = new StringBuilder("[");
-
- String prefix = "";
-
- for (Tuple tuple : tupleBatch) {
- outputString.append(prefix);
- prefix = ",";
- outputString.append(tuple.toString());
- }
- outputString.append("]");
- return outputString.toString();
- }
-
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordGeneric.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordGeneric.java
new file mode 100755
index 0000000000000000000000000000000000000000..b7d080b5aa4b734fc2b7e1a8e89b0ff8265bb68f
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordGeneric.java
@@ -0,0 +1,1351 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.api.streamrecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple10;
+import eu.stratosphere.api.java.tuple.Tuple11;
+import eu.stratosphere.api.java.tuple.Tuple12;
+import eu.stratosphere.api.java.tuple.Tuple13;
+import eu.stratosphere.api.java.tuple.Tuple14;
+import eu.stratosphere.api.java.tuple.Tuple15;
+import eu.stratosphere.api.java.tuple.Tuple16;
+import eu.stratosphere.api.java.tuple.Tuple17;
+import eu.stratosphere.api.java.tuple.Tuple18;
+import eu.stratosphere.api.java.tuple.Tuple19;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple20;
+import eu.stratosphere.api.java.tuple.Tuple21;
+import eu.stratosphere.api.java.tuple.Tuple22;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.tuple.Tuple6;
+import eu.stratosphere.api.java.tuple.Tuple7;
+import eu.stratosphere.api.java.tuple.Tuple8;
+import eu.stratosphere.api.java.tuple.Tuple9;
+import eu.stratosphere.api.java.typeutils.BasicArrayTypeInfo;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
+import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
+import eu.stratosphere.types.TypeInformation;
+
+/**
+ * Object for storing serializable records in batch (single records are
+ * represented batches with one element) used for sending records between task
+ * objects in Stratosphere stream processing. The elements of the batch are
+ * Tuples.
+ */
+public class StreamRecordGeneric implements IOReadableWritable, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected List tupleBatch;
+ private UID uid = new UID();
+ private int numOfFields;
+ private int numOfTuples;
+ private int batchSize;
+
+ private static final Class>[] CLASSES = new Class>[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
+
+ /**
+ * Creates a new empty instance for read
+ */
+ public StreamRecordGeneric() {
+
+ }
+
+ /**
+ * Creates empty StreamRecord with number of fields set
+ *
+ * @param numOfFields
+ * number of fields
+ */
+ public StreamRecordGeneric(int numOfFields) {
+ this.numOfFields = numOfFields;
+ this.numOfTuples = 0;
+ this.batchSize = 1;
+ tupleBatch = new ArrayList(batchSize);
+ }
+
+ /**
+ * Creates empty StreamRecord with number of fields and batch size set
+ *
+ * @param numOfFields
+ * Number of fields in the tuples
+ * @param batchSize
+ * Batch size
+ */
+ public StreamRecordGeneric(int numOfFields, int batchSize) {
+ this.numOfFields = numOfFields;
+ this.numOfTuples = 0;
+ this.batchSize = batchSize;
+ tupleBatch = new ArrayList(batchSize);
+ }
+
+ public StreamRecordGeneric(StreamRecordGeneric record) {
+ this.numOfFields = record.getNumOfFields();
+ this.numOfTuples = 0;
+ tupleBatch = new ArrayList();
+ this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
+ for (int i = 0; i < record.getNumOfTuples(); ++i) {
+ this.tupleBatch.add(copyTuple(record.getTuple(i)));
+ }
+ }
+
+ /**
+ * Creates a new batch of records containing only the given Tuple as element
+ * and sets desired batch size.
+ *
+ * @param tuple
+ * Tuple to be pushed to the record
+ * @param batchSize
+ * Number of tuples in the record
+ */
+ public StreamRecordGeneric(T tuple, int batchSize) {
+ numOfFields = tuple.getArity();
+ numOfTuples = 1;
+ this.batchSize = batchSize;
+ tupleBatch = new ArrayList(batchSize);
+ tupleBatch.add(tuple);
+ }
+
+ /**
+ * Creates a new batch of records containing the given Tuple list as
+ * elements
+ *
+ * @param tupleList
+ * Tuples to bes stored in the StreamRecord
+ */
+ public StreamRecordGeneric(List tupleList) {
+ numOfFields = tupleList.get(0).getArity();
+ numOfTuples = tupleList.size();
+ this.batchSize = numOfTuples;
+ tupleBatch = new ArrayList(tupleList);
+ }
+
+ /**
+ * Given a Tuple, creates a new a record batch containing the Tuple as its
+ * only element
+ *
+ * @param tuple
+ * Tuple to be pushed to the record
+ */
+ public StreamRecordGeneric(T tuple) {
+ this(tuple, 1);
+ }
+
+ /**
+ * Checks whether the record batch is empty
+ *
+ * @return true if the batch is empty, false if it contains Tuples
+ */
+ public boolean isEmpty() {
+ return (this.numOfTuples == 0);
+ }
+
+ /**
+ * Remove all the contents inside StreamRecord.
+ */
+ public void Clear() {
+ this.numOfTuples = 0;
+ tupleBatch.clear();
+ }
+
+ /**
+ * @return Number of fields in the tuples
+ */
+ public int getNumOfFields() {
+ return numOfFields;
+ }
+
+ /**
+ * @return Number of tuples in the batch
+ */
+ public int getNumOfTuples() {
+ return numOfTuples;
+ }
+
+ /**
+ * @return The ID of the object
+ */
+ public UID getId() {
+ return uid;
+ }
+
+ /**
+ * Set the ID of the StreamRecord object
+ *
+ * @param channelID
+ * ID of the emitting task
+ * @return The StreamRecord object
+ */
+ public StreamRecordGeneric setId(int channelID) {
+ uid = new UID(channelID);
+ return this;
+ }
+
+ /**
+ * Initializes the record batch elemnts to null
+ */
+ public void initRecords() {
+ tupleBatch.clear();
+ for (int i = 0; i < batchSize; i++) {
+ tupleBatch.add(null);
+ }
+ numOfTuples = batchSize;
+ }
+
+ /**
+ * Returns an iterable over the tuplebatch
+ *
+ * @return batch iterable
+ */
+ public Iterable getBatchIterable() {
+ return (Iterable) tupleBatch;
+ }
+
+ /**
+ * Returns the value of a field in the given position of the first tuple in
+ * the batch as an object, cast needed to obtain a typed version
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Object getField(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getField(0, fieldNumber);
+ }
+
+ /**
+ * Returns the value of a field in the given position of a specific tuple in
+ * the batch as an object, cast needed to obtain a typed version
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ Tuple tuple;
+ try {
+ tuple = tupleBatch.get(tupleNumber);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ try {
+ return tuple.getField(fieldNumber);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchFieldException());
+ }
+ }
+
+ /**
+ * Get a Boolean from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Boolean
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Boolean getBoolean(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getBoolean(0, fieldNumber);
+ }
+
+ /**
+ * Get a Boolean from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Boolean
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ // TODO: add exception for cast for all getters
+ public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Boolean) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Byte from thne given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Byte
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Byte getByte(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getByte(0, fieldNumber);
+ }
+
+ /**
+ * Get a Byte from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Byte
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Byte getByte(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Byte) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Character from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Character
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Character getCharacter(int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ return getCharacter(0, fieldNumber);
+ }
+
+ /**
+ * Get a Character from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Character
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Character getCharacter(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Character) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Double from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Double
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Double getDouble(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getDouble(0, fieldNumber);
+ }
+
+ /**
+ * Get a Double from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Double
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Double) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Float from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Float
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Float getFloat(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getFloat(0, fieldNumber);
+ }
+
+ /**
+ * Get a Float from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Float
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Float getFloat(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Float) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get an Integer from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Integer
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Integer getInteger(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getInteger(0, fieldNumber);
+ }
+
+ /**
+ * Get an Integer from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Integer
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Integer) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Long from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Long
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Long getLong(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getLong(0, fieldNumber);
+ }
+
+ /**
+ * Get a Long from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Long
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Long) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a Short from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Short
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Short getShort(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getShort(0, fieldNumber);
+ }
+
+ /**
+ * Get a Short from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as Short
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public Short getShort(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (Short) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Get a String from the given field of the first Tuple of the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as String
+ * @throws NoSuchTupleException
+ * , NoSuchFieldException
+ */
+ public String getString(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
+ return getString(0, fieldNumber);
+ }
+
+ /**
+ * Get a String from the given field of the specified Tuple of the batch
+ *
+ * @param tupleNumber
+ * Position of the tuple in the batch
+ * @param fieldNumber
+ * Position of the field in the tuple
+ * @return value of the field as String
+ */
+ public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
+ NoSuchFieldException {
+ try {
+ return (String) getField(tupleNumber, fieldNumber);
+ } catch (ClassCastException e) {
+ throw new FieldTypeMismatchException();
+ }
+ }
+
+ /**
+ * Sets a field in the given position of the first record in the batch
+ *
+ * @param fieldNumber
+ * Position of the field in the record
+ * @param o
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setField(int fieldNumber, Object o) throws NoSuchFieldException {
+ setField(0, fieldNumber, o);
+ }
+
+ /**
+ * Sets a field in the given position of a specific tuple in the batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param o
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ // TODO: consider interaction with batch size
+ public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
+ try {
+ tupleBatch.get(tupleNumber).setField(o, fieldNumber);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ }
+
+ /**
+ * Sets a Boolean field in the given position of the first tuple in the
+ * batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param b
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setBoolean(int fieldNumber, Boolean b) throws NoSuchFieldException {
+ setBoolean(0, fieldNumber, b);
+ }
+
+ /**
+ * Sets a Boolean field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param b
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, b);
+ }
+
+ /**
+ * Sets a Byte field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param b
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setByte(int fieldNumber, Byte b) throws NoSuchFieldException {
+ setByte(0, fieldNumber, b);
+ }
+
+ /**
+ * Sets a Byte field in the given position of a specific tuple in the batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param b
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setByte(int tupleNumber, int fieldNumber, Byte b) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, b);
+ }
+
+ /**
+ * Sets a Character field in the given position of the first tuple in the
+ * batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param c
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setCharacter(int fieldNumber, Character c) throws NoSuchFieldException {
+ setCharacter(0, fieldNumber, c);
+ }
+
+ /**
+ * Sets a Character field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param c
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setCharacter(int tupleNumber, int fieldNumber, Character c)
+ throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, c);
+ }
+
+ /**
+ * Sets a Double field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param d
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setDouble(int fieldNumber, Double d) throws NoSuchFieldException {
+ setDouble(0, fieldNumber, d);
+ }
+
+ /**
+ * Sets a Double field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param d
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setDouble(int tupleNumber, int fieldNumber, Double d) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, d);
+ }
+
+ /**
+ * Sets a Float field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param f
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setFloat(int fieldNumber, Float f) throws NoSuchFieldException {
+ setFloat(0, fieldNumber, f);
+ }
+
+ /**
+ * Sets a Double field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param f
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setFloat(int tupleNumber, int fieldNumber, Float f) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, f);
+ }
+
+ /**
+ * Sets an Integer field in the given position of the first tuple in the
+ * batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param i
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException {
+ setInteger(0, fieldNumber, i);
+ }
+
+ /**
+ * Sets an Integer field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param i
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setInteger(int tupleNumber, int fieldNumber, Integer i) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, i);
+ }
+
+ /**
+ * Sets a Long field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param l
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setLong(int fieldNumber, Long l) throws NoSuchFieldException {
+ setLong(0, fieldNumber, l);
+ }
+
+ /**
+ * Sets a Long field in the given position of a specific tuple in the batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param l
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setLong(int tupleNumber, int fieldNumber, Long l) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, l);
+ }
+
+ /**
+ * Sets a Short field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param s
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setShort(int fieldNumber, Short s) throws NoSuchFieldException {
+ setShort(0, fieldNumber, s);
+ }
+
+ /**
+ * Sets a Short field in the given position of a specific tuple in the batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param s
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setShort(int tupleNumber, int fieldNumber, Short s) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, s);
+ }
+
+ /**
+ * Sets a String field in the given position of the first tuple in the batch
+ *
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param str
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setString(int fieldNumber, String str) throws NoSuchFieldException {
+ setField(0, fieldNumber, str);
+ }
+
+ /**
+ * Sets a String field in the given position of a specific tuple in the
+ * batch
+ *
+ * @param tupleNumber
+ * Position of tuple in batch
+ * @param fieldNumber
+ * Position of field in tuple
+ * @param str
+ * New value
+ * @throws NoSuchFieldException
+ * the Tuple does not have this many fields
+ */
+ public void setString(int tupleNumber, int fieldNumber, String str) throws NoSuchFieldException {
+ setField(tupleNumber, fieldNumber, str);
+ }
+
+ /**
+ * @return First tuple of the batch
+ * @throws NoSuchTupleException
+ * the StreamRecord does not have this many tuples
+ */
+ public T getTuple() throws NoSuchTupleException {
+ return getTuple(0);
+ }
+
+ /**
+ * @param tupleNumber
+ * Position of the record in the batch
+ * @return Chosen tuple
+ * @throws NoSuchTupleException
+ * the Tuple does not have this many fields
+ */
+ public T getTuple(int tupleNumber) throws NoSuchTupleException {
+ try {
+ return tupleBatch.get(tupleNumber);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ }
+
+ /**
+ * Gets the fields of the first tuple of the batch into the parameter tuple
+ *
+ * @param tuple
+ * Target tuple
+ * @throws NoSuchTupleException
+ * , TupleSizeMismatchException
+ */
+ public void getTupleInto(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
+ getTupleInto(0, tuple);
+ }
+
+ /**
+ * Gets the fields of the specified tuple of the batch into the parameter
+ * tuple
+ *
+ * @param tupleNumber
+ * Position of the tuple to be written out
+ *
+ * @param tuple
+ * Target tuple
+ * @throws NoSuchTupleException
+ * , TupleSizeMismatchException
+ */
+ public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
+ TupleSizeMismatchException {
+
+ if (tuple.getArity() == numOfFields) {
+ try {
+ Tuple source = tupleBatch.get(tupleNumber);
+ for (int i = 0; i < numOfFields; i++) {
+ tuple.setField(source.getField(i), i);
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ } else {
+ throw (new TupleSizeMismatchException());
+ }
+
+ }
+
+ /**
+ * Sets the first tuple in the batch with the given tuple
+ *
+ * @param tuple
+ * Tuple to set
+ * @throws NoSuchTupleException
+ * , TupleSizeMismatchException
+ */
+ public void setTuple(T tuple) throws NoSuchTupleException, TupleSizeMismatchException {
+ setTuple(0, tuple);
+ }
+
+ /**
+ * Sets a tuple at the given position in the batch with the given tuple
+ *
+ * @param tupleNumber
+ * Position of tuple in the batch
+ * @param tuple
+ * Value to set
+ * @throws NoSuchTupleException
+ * , TupleSizeMismatchException
+ */
+ public void setTuple(int tupleNumber, T tuple) throws NoSuchTupleException,
+ TupleSizeMismatchException {
+ if (tuple.getArity() == numOfFields) {
+ try {
+ tupleBatch.set(tupleNumber, tuple);
+ } catch (IndexOutOfBoundsException e) {
+ throw (new NoSuchTupleException());
+ }
+ } else {
+ throw (new TupleSizeMismatchException());
+ }
+ }
+
+ /**
+ * Checks if the number of fields are equal to the batch field size then
+ * adds the Tuple to the end of the batch
+ *
+ * @param tuple
+ * Tuple to be added as the next record of the batch
+ * @throws TupleSizeMismatchException
+ * Tuple specified has illegal size
+ */
+ public void addTuple(T tuple) throws TupleSizeMismatchException {
+ addTuple(numOfTuples, tuple);
+ }
+
+ /**
+ * Checks if the number of fields are equal to the batch field size then
+ * inserts the Tuple to the given position into the recordbatch
+ *
+ * @param index
+ * Position of the added tuple
+ * @param tuple
+ * Tuple to be added as the next record of the batch
+ * @throws TupleSizeMismatchException
+ * Tuple specified has illegal size
+ */
+ public void addTuple(int index, T tuple) throws TupleSizeMismatchException {
+ if (tuple.getArity() == numOfFields) {
+ tupleBatch.add(index, tuple);
+ numOfTuples++;
+ } else {
+ throw new TupleSizeMismatchException();
+ }
+ }
+
+ /**
+ * Removes the tuple at the given position from the batch and returns it
+ *
+ * @param index
+ * Index of tuple to remove
+ * @return Removed tuple
+ * @throws TupleSizeMismatchException
+ * Tuple specified has illegal size
+ */
+ public Tuple removeTuple(int index) throws TupleSizeMismatchException {
+ if (index < numOfTuples) {
+ numOfTuples--;
+ return tupleBatch.remove(index);
+ } else {
+ throw new TupleSizeMismatchException();
+ }
+ }
+
+ /**
+ * Creates a copy of the StreamRecord object by Serializing and
+ * deserializing it
+ *
+ * @return copy of the StreamRecord
+ * @throws IOException
+ * Write or read failed
+ */
+ public StreamRecordGeneric copySerialized() throws IOException {
+ ByteArrayOutputStream buff = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(buff);
+ StreamRecordGeneric newRecord = new StreamRecordGeneric();
+ this.write(out);
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
+
+ newRecord.read(in);
+ return newRecord;
+ }
+
+ /**
+ * Creates a deep copy of the StreamRecord
+ *
+ * @return Copy of the StreamRecord
+ *
+ */
+ public StreamRecordGeneric copy() {
+ StreamRecordGeneric newRecord = new StreamRecordGeneric(numOfFields, numOfTuples);
+
+ newRecord.uid = new UID(Arrays.copyOf(uid.getId(), 20));
+
+ for (T tuple : tupleBatch) {
+ newRecord.tupleBatch.add(copyTuple(tuple));
+ }
+
+ return newRecord;
+ }
+
+ /**
+ * Creates deep copy of Tuple
+ *
+ * @param tuple
+ * Tuple to copy
+ * @return Copy of the tuple
+ */
+ public T copyTuple(T tuple) {
+ // TODO: implement deep copy for arrays
+ int numofFields = tuple.getArity();
+ T newTuple = null;
+ try {
+ newTuple = (T) CLASSES[numofFields - 1].newInstance();
+
+ for (int i = 0; i < numofFields; i++) {
+ Class extends Object> type = tuple.getField(i).getClass();
+ if (type.isArray()) {
+ if (type.equals(Boolean[].class)) {
+ Boolean[] arr = (Boolean[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Byte[].class)) {
+ Byte[] arr = (Byte[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Character[].class)) {
+ Character[] arr = (Character[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Double[].class)) {
+ Double[] arr = (Double[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Float[].class)) {
+ Float[] arr = (Float[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Integer[].class)) {
+ Integer[] arr = (Integer[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Long[].class)) {
+ Long[] arr = (Long[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(Short[].class)) {
+ Short[] arr = (Short[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ } else if (type.equals(String[].class)) {
+ String[] arr = (String[]) tuple.getField(i);
+ newTuple.setField(Arrays.copyOf(arr, arr.length), i);
+ }
+ newTuple.setField(tuple.getField(i), i);
+ } else {
+ newTuple.setField(tuple.getField(i), i);
+ }
+ }
+ } catch (InstantiationException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return newTuple;
+ }
+
+ /**
+ * copy tuples from the given record and append them to the end.
+ *
+ * @param record
+ * record to be appended
+ */
+ public void appendRecord(StreamRecordGeneric record) {
+ for (int i = 0; i < record.getNumOfTuples(); ++i) {
+ this.addTuple(record.getTuple(i));
+ }
+ }
+
+ /**
+ * Converts tuple field types to a byte array
+ *
+ * @param tuple
+ * @return byte array representing types
+ */
+ byte[] tupleTypesToByteArray(Tuple tuple) {
+ byte[] typeNums = new byte[numOfFields];
+ for (int i = 0; i < typeNums.length; i++) {
+ Class extends Object> type = tuple.getField(i).getClass();
+ if (type.equals(Boolean.class)) {
+ typeNums[i] = 0;
+ } else if (type.equals(Byte.class)) {
+ typeNums[i] = 1;
+ } else if (type.equals(Character.class)) {
+ typeNums[i] = 2;
+ } else if (type.equals(Double.class)) {
+ typeNums[i] = 3;
+ } else if (type.equals(Float.class)) {
+ typeNums[i] = 4;
+ } else if (type.equals(Integer.class)) {
+ typeNums[i] = 5;
+ } else if (type.equals(Long.class)) {
+ typeNums[i] = 6;
+ } else if (type.equals(Short.class)) {
+ typeNums[i] = 7;
+ } else if (type.equals(String.class)) {
+ typeNums[i] = 8;
+ } else if (type.equals(Boolean[].class)) {
+ typeNums[i] = 9;
+ } else if (type.equals(Byte[].class)) {
+ typeNums[i] = 10;
+ } else if (type.equals(Character[].class)) {
+ typeNums[i] = 11;
+ } else if (type.equals(Double[].class)) {
+ typeNums[i] = 12;
+ } else if (type.equals(Float[].class)) {
+ typeNums[i] = 13;
+ } else if (type.equals(Integer[].class)) {
+ typeNums[i] = 14;
+ } else if (type.equals(Long[].class)) {
+ typeNums[i] = 15;
+ } else if (type.equals(Short[].class)) {
+ typeNums[i] = 16;
+ } else if (type.equals(String[].class)) {
+ typeNums[i] = 17;
+ }
+ }
+ return typeNums;
+ }
+
+ /**
+ * Gets tuple field types from a byte array
+ *
+ * @param byte array representing types
+ * @param numberOfFields
+ * @return TypeInfo array of field types
+ */
+ @SuppressWarnings("rawtypes")
+ TypeInformation[] tupleTypesFromByteArray(byte[] representation) {
+ TypeInformation[] basicTypes = new TypeInformation[representation.length];
+ for (int i = 0; i < basicTypes.length; i++) {
+ switch (representation[i]) {
+ case 0:
+ basicTypes[i] = BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ break;
+ case 1:
+ basicTypes[i] = BasicTypeInfo.BYTE_TYPE_INFO;
+ break;
+ case 2:
+ basicTypes[i] = BasicTypeInfo.CHAR_TYPE_INFO;
+ break;
+ case 3:
+ basicTypes[i] = BasicTypeInfo.DOUBLE_TYPE_INFO;
+ break;
+ case 4:
+ basicTypes[i] = BasicTypeInfo.FLOAT_TYPE_INFO;
+ break;
+ case 5:
+ basicTypes[i] = BasicTypeInfo.INT_TYPE_INFO;
+ break;
+ case 6:
+ basicTypes[i] = BasicTypeInfo.LONG_TYPE_INFO;
+ break;
+ case 7:
+ basicTypes[i] = BasicTypeInfo.SHORT_TYPE_INFO;
+ break;
+ case 8:
+ basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
+ break;
+ case 9:
+ basicTypes[i] = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
+ break;
+ case 10:
+ basicTypes[i] = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+ break;
+ case 11:
+ basicTypes[i] = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
+ break;
+ case 12:
+ basicTypes[i] = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
+ break;
+ case 13:
+ basicTypes[i] = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
+ break;
+ case 14:
+ basicTypes[i] = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
+ break;
+ case 15:
+ basicTypes[i] = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
+ break;
+ case 16:
+ basicTypes[i] = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
+ break;
+ case 17:
+ basicTypes[i] = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
+ break;
+ default:
+ basicTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
+ break;
+ }
+ }
+ return basicTypes;
+ }
+
+ /**
+ * Write method definition for the IOReadableWritable interface
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+
+ uid.write(out);
+
+ out.writeByte(numOfFields);
+ out.writeInt(numOfTuples);
+
+ byte[] typesInByte = tupleTypesToByteArray(getTuple());
+ out.write(typesInByte);
+
+ TupleTypeInfo typeInfo = new TupleTypeInfo(
+ tupleTypesFromByteArray(typesInByte));
+ TupleSerializer tupleSerializer = (TupleSerializer) typeInfo
+ .createSerializer();
+ SerializationDelegate serializationDelegate = new SerializationDelegate(
+ tupleSerializer);
+
+ for (Tuple tuple : tupleBatch) {
+ serializationDelegate.setInstance(tuple);
+ serializationDelegate.write(out);
+ }
+ }
+
+ /**
+ * Read method definition for the IOReadableWritable interface
+ */
+ @Override
+ public void read(DataInput in) throws IOException {
+ uid = new UID();
+ uid.read(in);
+
+ numOfFields = in.readByte();
+ numOfTuples = in.readInt();
+
+ tupleBatch = new ArrayList(numOfTuples);
+
+ byte[] typesInByte = new byte[numOfFields];
+ in.readFully(typesInByte, 0, numOfFields);
+
+ TupleTypeInfo typeInfo = new TupleTypeInfo(tupleTypesFromByteArray(typesInByte));
+ TupleSerializer tupleSerializer = typeInfo.createSerializer();
+
+ DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer);
+
+ for (int k = 0; k < numOfTuples; ++k) {
+ dd.setInstance(tupleSerializer.createInstance());
+ dd.read(in);
+ tupleBatch.add(dd.getInstance());
+ }
+ }
+
+ /**
+ * Creates a String representation as a list of tuples
+ */
+ public String toString() {
+ StringBuilder outputString = new StringBuilder("[");
+
+ String prefix = "";
+
+ for (Tuple tuple : tupleBatch) {
+ outputString.append(prefix);
+ prefix = ",";
+ outputString.append(tuple.toString());
+ }
+ outputString.append("]");
+ return outputString.toString();
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java
index 085b19ad2284d100741acd1a93b30605b5a1c819..9eba70e9b8edc7c26a4bae6479e56faacb303b9a 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java
@@ -15,15 +15,17 @@
package eu.stratosphere.streaming.examples.wordcount;
+import java.util.HashMap;
+import java.util.Map;
+
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
- private MutableTableState wordCounts = new MutableTableState();
+ private Map wordCounts = new HashMap();
private String word = "";
private Integer count = 0;
@@ -45,7 +47,7 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setInteger(1, count);
emit(outRecord);
- performanceCounter.count();
+ // performanceCounter.count();
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java
index 641980346edd74307737d9190056aa73d8b85f10..c1d1a031a25d4ab971ac5d3ac36b4d3b423db692 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java
@@ -48,7 +48,7 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
outRecord.setString(0, word);
System.out.println("word=" + word);
emit(outRecord);
- performanceCounter.count();
+ //performanceCounter.count();
}
}
}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java
new file mode 100755
index 0000000000000000000000000000000000000000..54e11b80e56fdafbed535da165e917ffea280846
--- /dev/null
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java
@@ -0,0 +1,31 @@
+package eu.stratosphere.streaming.api;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.java.tuple.Tuple1;
+
+public class StreamCollectorTest {
+
+ @Test
+ public void testStreamCollector() {
+ StreamCollector collector = new StreamCollector(10, 0);
+ assertEquals(10, collector.batchSize);
+ }
+
+ @Test
+ public void testCollect() {
+ StreamCollector collector = new StreamCollector(2, 0);
+ collector.collect(new Tuple1(3));
+ collector.collect(new Tuple1(4));
+ collector.collect(new Tuple1(5));
+ collector.collect(new Tuple1(6));
+
+ }
+
+ @Test
+ public void testClose() {
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java
new file mode 100755
index 0000000000000000000000000000000000000000..f23d7147fe7b5905e6a89e48483a4b98c4fcdec7
--- /dev/null
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java
@@ -0,0 +1,100 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.api.streamrecord;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+import eu.stratosphere.types.TypeInformation;
+
+public class ArrayStreamRecordTest {
+
+ @Test
+ public void constructorTest() {
+ ArrayStreamRecord record = new ArrayStreamRecord(10);
+ assertEquals(10, record.getBatchSize());
+
+ Tuple[] tuples = new Tuple[2];
+ tuples[0] = new Tuple1(2);
+ tuples[1] = new Tuple1(3);
+
+ ArrayStreamRecord record1 = new ArrayStreamRecord(tuples);
+
+ assertEquals(2, record1.getBatchSize());
+
+ ArrayStreamRecord record2 = new ArrayStreamRecord(record1);
+ assertEquals(2, record2.getBatchSize());
+ }
+
+ @Test
+ public void typeExtractTest() throws IOException, ClassNotFoundException {
+
+ ByteArrayOutputStream buff = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buff);
+
+ MyGeneric> g = new MyGeneric2();
+ out.writeObject(g);
+
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buff.toByteArray()));
+
+ MyGeneric> f = (MyGeneric>) in.readObject();
+
+ TypeInformation> ti = TypeExtractor.createTypeInfo(MyGeneric.class, f.getClass(), 0,
+ null, null);
+
+ System.out.println("Type info: " + ti);
+
+ }
+
+ @Test
+ public void StreamRecordSpeedTest() {
+ int len = 1000000;
+ ArrayStreamRecord arecord = new ArrayStreamRecord(len);
+ StreamRecord record = new StreamRecord(len);
+ Tuple2 tuple = new Tuple2(2, "a");
+ long standardTime=System.nanoTime();
+
+ for (int i = 0; i < len; i++) {
+ record.setTuple(i, tuple);
+ }
+ standardTime=System.nanoTime()-standardTime;
+
+ long arrayTime=System.nanoTime();
+ for (int i = 0; i < len; i++) {
+ arecord.setTuple(i, tuple);
+ }
+ arrayTime=System.nanoTime()-arrayTime;
+
+ System.out.println("Standard time: "+standardTime);
+ System.out.println("Array time: "+arrayTime);
+
+ float multi = (float)standardTime/(float)arrayTime;
+ System.out.println("Mulitplier: "+multi);
+
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric.java
new file mode 100755
index 0000000000000000000000000000000000000000..f04ab965032503e67bbe942f52356e755851ae0a
--- /dev/null
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric.java
@@ -0,0 +1,9 @@
+package eu.stratosphere.streaming.api.streamrecord;
+
+import java.io.Serializable;
+
+public abstract class MyGeneric implements Serializable {
+
+ public abstract void asd();
+
+}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric2.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric2.java
new file mode 100755
index 0000000000000000000000000000000000000000..6729891b4d281a99d1554756f3e0f39283e0ae0c
--- /dev/null
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/MyGeneric2.java
@@ -0,0 +1,11 @@
+package eu.stratosphere.streaming.api.streamrecord;
+
+public class MyGeneric2 extends MyGeneric{
+
+ @Override
+ public void asd() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
index 5dd046dd071ff89ae223eb80dd52420c800e0c22..d0387c34227eb8614fe860afc734b25cf302d651 100644
--- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
+++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
@@ -15,479 +15,60 @@
package eu.stratosphere.streaming.api.streamrecord;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.api.java.tuple.Tuple3;
-import eu.stratosphere.api.java.tuple.Tuple5;
-import eu.stratosphere.api.java.tuple.Tuple9;
-import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.types.TypeInformation;
public class StreamRecordTest {
@Test
- public void singleRecordSetGetTest() {
- StreamRecord record = new StreamRecord(
- new Tuple9(
- "Stratosphere", 1, 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42));
+ public void constructorTest() {
+ StreamRecord record = new StreamRecord(10);
+ assertEquals(10, record.getBatchSize());
- assertEquals(9, record.getNumOfFields());
- assertEquals(1, record.getNumOfTuples());
+ List tuples = new ArrayList();
+ tuples.add(new Tuple1(2));
+ tuples.add(new Tuple1(3));
- assertEquals("Stratosphere", record.getString(0));
- assertEquals((Integer) 1, record.getInteger(1));
- assertEquals((Long) 2L, record.getLong(2));
- assertEquals(true, record.getBoolean(3));
- assertEquals((Double) 3.5, record.getDouble(4));
- assertEquals((Byte) (byte) 0xa, record.getByte(5));
- assertEquals((Character) 'a', record.getCharacter(6));
- assertEquals((Float) 0.1f, record.getFloat(7));
- assertEquals((Short) (short) 42, record.getShort(8));
+ StreamRecord record1 = new StreamRecord(tuples);
- Tuple9 tuple = new Tuple9();
-
- record.getTupleInto(tuple);
-
- assertEquals("Stratosphere", tuple.getField(0));
- assertEquals((Integer) 1, tuple.getField(1));
- assertEquals((Long) 2L, tuple.getField(2));
- assertEquals(true, tuple.getField(3));
- assertEquals((Double) 3.5, tuple.getField(4));
- assertEquals((Byte) (byte) 0xa, tuple.getField(5));
- assertEquals((Character) 'a', tuple.getField(6));
- assertEquals((Float) 0.1f, tuple.getField(7));
- assertEquals((Short) (short) 42, tuple.getField(8));
-
- record.setString(0, "Streaming");
- record.setInteger(1, 2);
- record.setLong(2, 3L);
- record.setBoolean(3, false);
- record.setDouble(4, 4.5);
- record.setByte(5, (byte) 0xb);
- record.setCharacter(6, 'b');
- record.setFloat(7, 0.2f);
- record.setShort(8, (short) 69);
-
- assertEquals("Streaming", record.getString(0));
- assertEquals((Integer) 2, record.getInteger(1));
- assertEquals((Long) 3L, record.getLong(2));
- assertEquals(false, record.getBoolean(3));
- assertEquals((Double) 4.5, record.getDouble(4));
- assertEquals((Byte) (byte) 0xb, record.getByte(5));
- assertEquals((Character) 'b', record.getCharacter(6));
- assertEquals((Float) 0.2f, record.getFloat(7));
- assertEquals((Short) (short) 69, record.getShort(8));
-
- record.setString(0, 0, "");
- record.setInteger(0, 1, 0);
- record.setLong(0, 2, 0L);
- record.setBoolean(0, 3, false);
- record.setDouble(0, 4, 0.);
- record.setByte(0, 5, (byte) 0x0);
- record.setCharacter(0, 6, '\0');
- record.setFloat(0, 7, 0.f);
- record.setShort(0, 8, (short) 0);
-
- assertEquals("", record.getString(0, 0));
- assertEquals((Integer) 0, record.getInteger(0, 1));
- assertEquals((Long) 0L, record.getLong(0, 2));
- assertEquals(false, record.getBoolean(0, 3));
- assertEquals((Double) 0., record.getDouble(0, 4));
- assertEquals((Byte) (byte) 0x0, record.getByte(0, 5));
- assertEquals((Character) '\0', record.getCharacter(0, 6));
- assertEquals((Float) 0.f, record.getFloat(0, 7));
- assertEquals((Short) (short) 0, record.getShort(0, 8));
-
- }
-
- @Test
- public void batchRecordSetGetTest() {
- StreamRecord record = new StreamRecord(5, 2);
-
- Tuple5 tuple = new Tuple5(
- "Stratosphere", 1, 2L, true, 3.5);
-
- record.addTuple(StreamRecord.copyTuple(tuple));
-
- tuple.setField("", 0);
- tuple.setField(0, 1);
- tuple.setField(0L, 2);
- tuple.setField(false, 3);
- tuple.setField(0., 4);
-
- record.addTuple(tuple);
- try {
- record.addTuple(new Tuple1("4"));
- fail();
- } catch (TupleSizeMismatchException e) {
- }
-
- assertEquals(5, record.getNumOfFields());
- assertEquals(2, record.getNumOfTuples());
-
- assertEquals("Stratosphere", record.getString(0, 0));
- assertEquals((Integer) 1, record.getInteger(0, 1));
- assertEquals((Long) 2L, record.getLong(0, 2));
- assertEquals(true, record.getBoolean(0, 3));
- assertEquals((Double) 3.5, record.getDouble(0, 4));
-
- assertEquals("", record.getString(1, 0));
- assertEquals((Integer) 0, record.getInteger(1, 1));
- assertEquals((Long) 0L, record.getLong(1, 2));
- assertEquals(false, record.getBoolean(1, 3));
- assertEquals((Double) 0., record.getDouble(1, 4));
-
- record.setTuple(new Tuple5("", 0, 0L, false, 0.));
-
- assertEquals(5, record.getNumOfFields());
- assertEquals(2, record.getNumOfTuples());
-
- assertEquals("", record.getString(0, 0));
- assertEquals((Integer) 0, record.getInteger(0, 1));
- assertEquals((Long) 0L, record.getLong(0, 2));
- assertEquals(false, record.getBoolean(0, 3));
- assertEquals((Double) 0., record.getDouble(0, 4));
-
- record.setTuple(1, new Tuple5("Stratosphere", 1,
- 2L, true, 3.5));
-
- assertEquals("Stratosphere", record.getString(1, 0));
- assertEquals((Integer) 1, record.getInteger(1, 1));
- assertEquals((Long) 2L, record.getLong(1, 2));
- assertEquals(true, record.getBoolean(1, 3));
- assertEquals((Double) 3.5, record.getDouble(1, 4));
-
- record.removeTuple(1);
-
- assertEquals(1, record.getNumOfTuples());
-
- assertEquals("", record.getString(0, 0));
- assertEquals((Integer) 0, record.getInteger(0, 1));
- assertEquals((Long) 0L, record.getLong(0, 2));
- assertEquals(false, record.getBoolean(0, 3));
- assertEquals((Double) 0., record.getDouble(0, 4));
-
- record.addTuple(0, new Tuple5("Stratosphere", 1,
- 2L, true, 3.5));
-
- assertEquals(2, record.getNumOfTuples());
-
- assertEquals("Stratosphere", record.getString(0, 0));
- assertEquals((Integer) 1, record.getInteger(0, 1));
- assertEquals((Long) 2L, record.getLong(0, 2));
- assertEquals(true, record.getBoolean(0, 3));
- assertEquals((Double) 3.5, record.getDouble(0, 4));
-
- }
-
- @Test
- public void copyTest() throws IOException {
- StreamRecord a = new StreamRecord(new Tuple1("Big"));
- a.setId(0);
- StreamRecord b = a.copy();
- assertTrue(a.getField(0).equals(b.getField(0)));
- assertTrue(a.getId().equals(b.getId()));
- b.setId(2);
- b.setTuple(new Tuple1("Data"));
- assertFalse(a.getId().equals(b.getId()));
- assertFalse(a.getField(0).equals(b.getField(0)));
- final int ITERATION = 10000;
-
- StreamRecord c = new StreamRecord(new Tuple1("Big"));
-
- long t = System.nanoTime();
- for (int i = 0; i < ITERATION; i++) {
- c.copySerialized();
- }
- long t2 = System.nanoTime() - t;
- System.out.println("Serialized copy:\t" + t2 + " ns");
-
- t = System.nanoTime();
- for (int i = 0; i < ITERATION; i++) {
- c.copy();
- }
- t2 = System.nanoTime() - t;
- System.out.println("Copy:\t" + t2 + " ns");
+ assertEquals(2, record1.getBatchSize());
+ StreamRecord record2 = new StreamRecord(record1);
+ assertEquals(2, record2.getBatchSize());
}
@Test
- public void exceptionTest() {
- StreamRecord a = new StreamRecord(new Tuple1("Big"));
- try {
- a.setTuple(4, new Tuple1("Data"));
- fail();
- } catch (NoSuchTupleException e) {
- }
+ public void typeExtractTest() throws IOException, ClassNotFoundException {
- try {
- a.setTuple(new Tuple2("Data", "Stratosphere"));
- fail();
- } catch (TupleSizeMismatchException e) {
- }
-
- StreamRecord b = new StreamRecord();
- try {
- b.addTuple(new Tuple2("Data", "Stratosphere"));
- fail();
- } catch (TupleSizeMismatchException e) {
- }
-
- try {
- a.getField(3);
- fail();
- } catch (NoSuchFieldException e) {
- }
-
- try {
- a.getBoolean(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
-
- try {
- a.getByte(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getCharacter(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getDouble(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getFloat(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getInteger(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getLong(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
- try {
- a.getShort(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
-
- StreamRecord c = new StreamRecord(new Tuple1(1));
- try {
- c.getString(0);
- fail();
- } catch (FieldTypeMismatchException e) {}
-
- }
-
- @Test
- public void writeReadTest() {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(buff);
+ ObjectOutputStream out = new ObjectOutputStream(buff);
- int num = 42;
- String str = "above clouds";
- Integer[] intArray = new Integer[] { 1, 2 };
- Tuple3 tuple1 = new Tuple3(num,
- str, intArray);
- Tuple3 tuple2 = new Tuple3(1, "",
- new Integer[] { 1, 2 });
- StreamRecord rec = new StreamRecord(tuple1);
- rec.addTuple(tuple2);
+ MyGeneric> g = new MyGeneric2();
+ out.writeObject(g);
- try {
- rec.write(out);
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buff.toByteArray()));
- StreamRecord newRec = new StreamRecord();
- newRec.read(in);
+ MyGeneric> f = (MyGeneric>) in.readObject();
- assertEquals(2, newRec.getNumOfTuples());
+ TypeInformation> ti = TypeExtractor.createTypeInfo(MyGeneric.class, f.getClass(), 0,
+ null, null);
- @SuppressWarnings("unchecked")
- Tuple3 tupleOut1 = (Tuple3) newRec
- .getTuple(0);
+ System.out.println("Type info: " + ti);
- assertEquals(tupleOut1.getField(0), 42);
- assertEquals(str, tupleOut1.getField(1));
- assertArrayEquals(intArray, (Integer[]) tupleOut1.getField(2));
-
- @SuppressWarnings("unchecked")
- Tuple3 tupleOut2 = (Tuple3) newRec
- .getTuple(1);
- assertEquals(tupleOut2.getField(0), 1);
- assertEquals("", tupleOut2.getField(1));
- assertArrayEquals(new Integer[] { 1, 2 }, (Integer[]) tupleOut2.getField(2));
-
- } catch (IOException e) {
- fail();
- e.printStackTrace();
- }
-
- }
-
- @Test
- public void tupleCopyTest() {
- Tuple3 t1 = new Tuple3("a", 1,
- new Double[] { 4.2 });
-
- @SuppressWarnings("rawtypes")
- Tuple3 t2 = (Tuple3) StreamRecord.copyTuple(t1);
-
- assertEquals("a", t2.getField(0));
- assertEquals(1, t2.getField(1));
- assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
-
- t1.setField(2, 1);
- assertEquals(1, t2.getField(1));
- assertEquals(2, t1.getField(1));
-
- t1.setField(new Double[] { 3.14 }, 2);
- assertArrayEquals(new Double[] { 3.14 }, (Double[]) t1.getField(2));
- assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
-
- assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
- assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
- }
-
- @Test
- public void tupleArraySerializationTest() throws IOException {
- Tuple9 t1 = new Tuple9(
- new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
- new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
- new Long[] { 12345678900l }, new Short[] { 12345 }, new String[] { "something" });
-
- StreamRecord s1 = new StreamRecord(t1);
- StreamRecord s2 = s1.copySerialized();
-
- @SuppressWarnings("rawtypes")
- Tuple9 t2 = (Tuple9) s2.getTuple();
-
- assertArrayEquals(new Boolean[] { true }, (Boolean[]) t2.getField(0));
- assertArrayEquals(new Byte[] { 12 }, (Byte[]) t2.getField(1));
- assertArrayEquals(new Character[] { 'a' }, (Character[]) t2.getField(2));
- assertArrayEquals(new Double[] { 12.5 }, (Double[]) t2.getField(3));
- assertArrayEquals(new Float[] { 13.5f }, (Float[]) t2.getField(4));
- assertArrayEquals(new Integer[] { 1234 }, (Integer[]) t2.getField(5));
- assertArrayEquals(new Long[] { 12345678900l }, (Long[]) t2.getField(6));
- assertArrayEquals(new Short[] { 12345 }, (Short[]) t2.getField(7));
- assertArrayEquals(new String[] { "something" }, (String[]) t2.getField(8));
-
- assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
- assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
- assertEquals(t1.getField(2).getClass(), t2.getField(2).getClass());
- assertEquals(t1.getField(3).getClass(), t2.getField(3).getClass());
- assertEquals(t1.getField(4).getClass(), t2.getField(4).getClass());
- assertEquals(t1.getField(5).getClass(), t2.getField(5).getClass());
- assertEquals(t1.getField(6).getClass(), t2.getField(6).getClass());
- assertEquals(t1.getField(7).getClass(), t2.getField(7).getClass());
- assertEquals(t1.getField(8).getClass(), t2.getField(8).getClass());
}
- // TODO:measure performance of different serialization logics
- @Test
- public void typeCopyTest() throws NoSuchTupleException, IOException {
- StreamRecord rec = new StreamRecord(
- new Tuple9(
- (Boolean) true, (Byte) (byte) 12, (Character) 'a', (Double) 12.5,
- (Float) (float) 13.5, (Integer) 1234, (Long) 12345678900l,
- (Short) (short) 12345, "something"));
-
- ByteArrayOutputStream buff3 = new ByteArrayOutputStream();
- DataOutputStream out3 = new DataOutputStream(buff3);
- for (int i = 0; i < 1000; i++) {
- out3.write(rec.tupleTypesToByteArray(rec.getTuple()));
- }
-
- }
-
- @Test
- public void typeArrayCopyTest() throws NoSuchTupleException, IOException {
- StreamRecord rec = new StreamRecord(
- new Tuple9(
- new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
- new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
- new Long[] { 12345678900l }, new Short[] { 12345 },
- new String[] { "something" }));
-
- ByteArrayOutputStream buff = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(buff);
- for (int i = 0; i < 10000; i++) {
- out.write(rec.tupleTypesToByteArray(rec.getTuple()));
- }
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
- StreamRecord rec2 = new StreamRecord();
- Long start = System.nanoTime();
- for (int i = 0; i < 10000; i++) {
- byte[] byteTypes = new byte[9];
- in.read(byteTypes);
- TypeInformation>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
- @SuppressWarnings("unused")
- TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
- }
- System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns");
-
- start = System.nanoTime();
-
- byte[] byteTypes = rec.tupleTypesToByteArray(rec.getTuple());
- Tuple t = rec.getTuple();
-
- start = System.nanoTime();
- for (int i = 0; i < 10000; i++) {
- // rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
- TypeInformation>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
- @SuppressWarnings("unused")
- TupleTypeInfo typeInfo = new TupleTypeInfo(basicTypes);
- }
- System.out.println("Write with infoArray:\t\t" + (System.nanoTime() - start) + " ns");
- start = System.nanoTime();
- for (int i = 0; i < 10000; i++) {
- @SuppressWarnings("unused")
- TupleTypeInfo typeInfo = (TupleTypeInfo) TypeExtractor.getForObject(t);
- }
- System.out.println("Write with extract:\t\t" + (System.nanoTime() - start) + " ns");
- }
-
- @Test
- public void batchIteratorTest() {
-
- List tupleList = new LinkedList();
- tupleList.add(new Tuple1("Big"));
- tupleList.add(new Tuple1("Data"));
-
- StreamRecord a = new StreamRecord(tupleList);
-
- assertEquals(2, a.getNumOfTuples());
- assertEquals(1, a.getNumOfFields());
-
- for (Tuple t : a.getBatchIterable()) {
- System.out.println(t);
- }
-
- OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
- ols.newSampleData(new double[] { 1.0, 2.0 }, new double[][] { { 1, 2 }, { 3, 4 } });
- System.out.println(Arrays.toString(ols.estimateRegressionParameters()));
-
- }
}