提交 f30581c7 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Strated implementing StreamRecord with Tuple

上级 440f23ba
......@@ -19,36 +19,12 @@ This tutorial shows how to build Stratosphere Streaming on your own system. Plea
```
git clone https://github.com/stratosphere/stratosphere-streaming.git
cd stratosphere
mvn clean assembly:assembly
mvn clean package
```
### Run an example
To run an example counting the frequencies of unique words in the text of Shakespeare's [Hamlet](http://www.gutenberg.org/cache/epub/1787/pg1787.txt)
```
java -cp target/stratosphere-streaming-0.5-SNAPSHOT-jar-with-dependencies.jar eu.stratosphere.streaming.examples.wordcount.WordCountLocal
git clone https://github.com/stratosphere/stratosphere-streaming.git
cd stratosphere
mvn clean package
```
## Support
Don’t hesitate to ask!
[Open an issue](https://github.com/stratosphere/stratosphere-streaming/issues/new) on Github, if you found a bug or need any help.
The main project also has a [mailing list](https://groups.google.com/d/forum/stratosphere-dev) for both users and developers.
## Fork and Contribute
This is an active open-source project. We are always open to people who want to use the system or contribute to it.
Contact us if you are looking for implementation tasks that fit your skills.
The main project a list of [starter jobs](https://github.com/stratosphere/stratosphere/wiki/Starter-Jobs) in our wiki.
We use the GitHub Pull Request system for the development of Stratosphere. Just open a request if you want to contribute.
### What to contribute
* Bug reports
* Bug fixes
* Documentation
* Tools that ease the use and development of Stratosphere
* Well-written Stratosphere jobs
Let us know if you have created a system that uses Stratosphere, so that we can link to you.
......@@ -15,19 +15,42 @@
package eu.stratosphere.streaming.api.streamrecord;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple10;
import eu.stratosphere.api.java.tuple.Tuple11;
import eu.stratosphere.api.java.tuple.Tuple12;
import eu.stratosphere.api.java.tuple.Tuple13;
import eu.stratosphere.api.java.tuple.Tuple14;
import eu.stratosphere.api.java.tuple.Tuple15;
import eu.stratosphere.api.java.tuple.Tuple16;
import eu.stratosphere.api.java.tuple.Tuple17;
import eu.stratosphere.api.java.tuple.Tuple18;
import eu.stratosphere.api.java.tuple.Tuple19;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple20;
import eu.stratosphere.api.java.tuple.Tuple21;
import eu.stratosphere.api.java.tuple.Tuple22;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple6;
import eu.stratosphere.api.java.tuple.Tuple7;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
......@@ -38,15 +61,19 @@ import eu.stratosphere.types.Value;
* objects in Stratosphere stream processing. The elements of the batch are
* Value arrays.
*/
public class StreamRecord implements IOReadableWritable, Serializable {
public class StreamRecord<T extends Tuple> implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
private List<Value[]> recordBatch;
private List<T> recordBatch;
private StringValue uid = new StringValue("");
private int numOfFields;
private int numOfRecords;
private Class<? extends Tuple> clazz = null;
// private Random rnd = new Random();
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class,
Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class,
Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
/**
......@@ -54,7 +81,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord() {
this.numOfFields = 1;
recordBatch = new ArrayList<Value[]>();
recordBatch = new ArrayList<T>();
}
/**
......@@ -66,7 +93,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord(int length) {
numOfFields = length;
recordBatch = new ArrayList<Value[]>();
recordBatch = new ArrayList<T>();
}
/**
......@@ -81,7 +108,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord(int length, int batchSize) {
numOfFields = length;
recordBatch = new ArrayList<Value[]>(batchSize);
recordBatch = new ArrayList<T>(batchSize);
}
/**
......@@ -91,10 +118,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param values
* Array containing the Values for the first record in the batch
*/
public StreamRecord(Value... values) {
this(values.length, 1);
numOfRecords = 1;
recordBatch.add(values);
public StreamRecord(T tuple) {
this(tuple.getArity(), 0);
addRecord(tuple);
}
/**
......@@ -144,7 +170,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Value getField(int recordNumber, int fieldNumber) {
try {
return recordBatch.get(recordNumber)[fieldNumber];
return recordBatch.get(recordNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
......@@ -160,7 +186,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public Value getField(int fieldNumber) {
try {
return recordBatch.get(0)[fieldNumber];
return recordBatch.get(0).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
......@@ -176,9 +202,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param value
* Value to set
*/
public void setField(int recordNumber, int fieldNumber, Value value) {
public void setField(int recordNumber, int fieldNumber, Object o) {
try {
recordBatch.get(recordNumber)[fieldNumber] = value;
recordBatch.get(recordNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
......@@ -192,9 +218,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param value
* Value to set the given field to
*/
public void setField(int fieldNumber, Value value) {
public void setField(int fieldNumber, Object o) {
try {
recordBatch.get(0)[fieldNumber] = value;
recordBatch.get(0).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
......@@ -205,7 +231,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public Value[] getRecord(int recordNumber) {
public Tuple getRecord(int recordNumber) {
try {
return recordBatch.get(recordNumber);
} catch (IndexOutOfBoundsException e) {
......@@ -216,22 +242,30 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* @return Value array containing the fields of first the record
*/
public Value[] getRecord() {
public Tuple getRecord() {
return getRecord(0);
}
// TODO do not use this
private void setClass(T tuple) {
if (clazz == null) {
clazz = tuple.getClass();
}
}
/**
* Sets a record at the given position in the batch
*
* @param recordNumber
* Position of record in the batch
* @param fields
* @param tuple
* Value to set
*/
public void setRecord(int recordNumber, Value... fields) {
if (fields.length == numOfFields) {
public void setRecord(int recordNumber, T tuple) {
if (tuple.getArity() == numOfFields) {
try {
recordBatch.set(recordNumber, fields);
setClass(tuple);
recordBatch.set(recordNumber, tuple);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
......@@ -243,16 +277,17 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* Sets the first record in the batch
*
* @param fields
* @param tuple
* Value to set
*/
public void setRecord(Value... fields) {
if (fields.length == numOfFields) {
public void setRecord(T tuple) {
if (tuple.getArity() == numOfFields) {
setClass(tuple);
if (numOfRecords != 1) {
recordBatch = new ArrayList<Value[]>(1);
recordBatch.add(fields);
recordBatch = new ArrayList<T>(1);
recordBatch.add(tuple);
} else {
recordBatch.set(0, fields);
recordBatch.set(0, tuple);
}
} else {
throw (new RecordSizeMismatchException());
......@@ -263,12 +298,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Checks if the number of fields are equal to the batch field size then
* adds the Value array to the end of the batch
*
* @param fields
* @param tuple
* Value array to be added as the next record of the batch
*/
public void addRecord(Value... fields) {
if (fields.length == numOfFields) {
recordBatch.add(fields);
public void addRecord(T tuple) {
if (tuple.getArity() == numOfFields) {
setClass(tuple);
recordBatch.add(tuple);
numOfRecords++;
} else {
throw new RecordSizeMismatchException();
......@@ -281,28 +317,28 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return Copy of the StreamRecord
*
*/
// TODO:Fix record copy
public StreamRecord copy() {
StreamRecord copiedRecord = new StreamRecord(this.numOfFields, this.numOfRecords);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
// TODO implement!
return null;
}
DataOutputStream out = new DataOutputStream(byteStream);
private void writeTuple(Tuple tuple, DataOutput out) {
TypeInformation<? extends Tuple> typeInfo = TypeInformation.getForObject(tuple);
try {
this.write(out);
} catch (IOException e) {
e.printStackTrace();
}
DataInputStream in = new DataInputStream(new ByteArrayInputStream(byteStream.toByteArray()));
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
serializationDelegate.setInstance(tuple);
}
try {
copiedRecord.read(in);
} catch (IOException e) {
e.printStackTrace();
}
private void readTuple(Tuple tuple, DataInput in, int arity) throws IOException {
// TODO get the type somehow!s
TypeInformation<? extends Tuple> typeInfo = null;
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();
return copiedRecord;
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
}
@Override
......@@ -315,15 +351,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// Write the number of records with an IntValue
(new IntValue(numOfRecords)).write(out);
StringValue classNameValue = new StringValue("");
// write the records
for (Value[] record : recordBatch) {
// Write the fields
for (int i = 0; i < numOfFields; i++) {
classNameValue.setValue(record[i].getClass().getName());
classNameValue.write(out);
record[i].write(out);
}
for (Tuple tuple : recordBatch) {
writeTuple(tuple, out);
}
}
......@@ -342,45 +371,22 @@ public class StreamRecord implements IOReadableWritable, Serializable {
numOfRecords = numOfRecordsValue.getValue();
// Make sure the fields have numOfFields elements
recordBatch = new ArrayList<Value[]>();
StringValue stringValue = new StringValue("");
recordBatch = new ArrayList<T>();
for (int k = 0; k < numOfRecords; ++k) {
Value[] record = new Value[numOfFields];
// Read the fields
for (int i = 0; i < numOfFields; i++) {
stringValue.read(in);
try {
record[i] = (Value) Class.forName(stringValue.getValue()).newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
record[i].read(in);
}
recordBatch.add(record);
T tuple = null;
readTuple(tuple, in, numOfFields);
recordBatch.add(tuple);
}
}
// TODO: fix this method to work properly for non StringValue types
public String toString() {
StringBuilder outputString = new StringBuilder("(");
StringValue output;
for (int k = 0; k < numOfRecords; ++k) {
for (int i = 0; i < numOfFields; i++) {
try {
output = (StringValue) recordBatch.get(k)[i];
outputString.append(output.getValue() + ",");
} catch (ClassCastException e) {
outputString.append("NON-STRING,");
}
}
StringBuilder outputString = new StringBuilder("[");
for (Tuple tuple : recordBatch) {
outputString.append(tuple + ",");
}
outputString.append(")");
outputString.append("]");
return outputString.toString();
}
......
......@@ -20,11 +20,15 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.junit.Test;
import eu.stratosphere.streaming.api.streamrecord.NoSuchRecordException;
import eu.stratosphere.streaming.api.streamrecord.RecordSizeMismatchException;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
......@@ -77,14 +81,14 @@ public class StreamRecordTest {
@Test
public void copyTest() {
//TODO:test ID copy
// TODO:test ID copy
StreamRecord a = new StreamRecord(new StringValue("Big"));
StreamRecord b = a.copy();
assertTrue(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
b.setRecord(new StringValue("Data"));
assertFalse(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
}
@Test
public void cloneTest() {
StringValue sv = new StringValue("V1");
......@@ -112,7 +116,7 @@ public class StreamRecordTest {
fail();
} catch (RecordSizeMismatchException e) {
}
try {
a.getField(3);
fail();
......@@ -120,4 +124,29 @@ public class StreamRecordTest {
}
}
@Test
public void writeReadTest() {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
int num = 42;
String str = "above clouds";
StreamRecord<Tuple2<Integer, String>> rec = new StreamRecord<Tuple2<Integer, String>>(new Tuple2<Integer, String>(num, str));
try {
rec.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord<Tuple2<Integer, String>> newRec = new StreamRecord<Tuple2<Integer, String>>(2);
newRec.read(in);
assertEquals(num, newRec.getField(0));
assertEquals(str, newRec.getField(1));
} catch (IOException e) {
fail();
e.printStackTrace();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册