提交 942d6c04 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] array based streamrecord added

上级 453109de
...@@ -5,14 +5,13 @@ ...@@ -5,14 +5,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<version>0.2-SNAPSHOT</version> <version>0.5</version>
<artifactId>stratosphere-streaming</artifactId> <artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name> <name>stratosphere-streaming</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties> </properties>
...@@ -30,32 +29,32 @@ ...@@ -30,32 +29,32 @@
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId> <artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId> <artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId> <artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId> <artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId> <artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId> <artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
......
package eu.stratosphere.api.datastream;
import eu.stratosphere.types.TypeInformation;
public class DataStream<T> {
private final StreamExecutionEnvironment context;
private final TypeInformation<T> type;
protected DataStream(StreamExecutionEnvironment context, TypeInformation<T> type) {
if (context == null) {
throw new NullPointerException("context is null");
}
if (type == null) {
throw new NullPointerException("type is null");
}
this.context = context;
this.type = type;
}
public TypeInformation<T> getType() {
return this.type;
}
}
\ No newline at end of file
package eu.stratosphere.api.datastream;
import eu.stratosphere.types.TypeInformation;
public abstract class SingleStreamInputOperator<IN, OUT, O extends SingleStreamInputOperator<IN, OUT, O>> extends StreamOperator<OUT, O> {
protected SingleStreamInputOperator(StreamExecutionEnvironment context,
TypeInformation<OUT> type) {
super(context, type);
}
}
package eu.stratosphere.api.datastream;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
public StreamExecutionEnvironment() {
jobGraphBuilder = new JobGraphBuilder("jobGraph",FaultToleranceType.NONE);
}
// public static StreamExecutionEnvironment getLocalEnvironment() {
// return new StreamExecutionEnvironment();
// }
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
}
package eu.stratosphere.api.datastream;
import eu.stratosphere.types.TypeInformation;
public abstract class StreamOperator<OUT, O extends StreamOperator<OUT, O>> extends DataStream<OUT> {
protected StreamOperator(StreamExecutionEnvironment context, TypeInformation<OUT> type) {
super(context, type);
}
}
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class StreamCollector implements Collector<Tuple> {
protected ArrayStreamRecord streamRecord;
protected int batchSize;
protected int counter = 0;
protected int channelID;
public StreamCollector(int batchSize, int channelID) {
this.batchSize = batchSize;
this.streamRecord = new ArrayStreamRecord(batchSize);
this.channelID = channelID;
}
@Override
public void collect(Tuple tuple) {
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
if (counter >= batchSize) {
counter = 0;
streamRecord.setId(channelID);
emit(streamRecord);
}
}
private void emit(ArrayStreamRecord streamRecord) {
System.out.println(streamRecord);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple10;
import eu.stratosphere.api.java.tuple.Tuple11;
import eu.stratosphere.api.java.tuple.Tuple12;
import eu.stratosphere.api.java.tuple.Tuple13;
import eu.stratosphere.api.java.tuple.Tuple14;
import eu.stratosphere.api.java.tuple.Tuple15;
import eu.stratosphere.api.java.tuple.Tuple16;
import eu.stratosphere.api.java.tuple.Tuple17;
import eu.stratosphere.api.java.tuple.Tuple18;
import eu.stratosphere.api.java.tuple.Tuple19;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple20;
import eu.stratosphere.api.java.tuple.Tuple21;
import eu.stratosphere.api.java.tuple.Tuple22;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple6;
import eu.stratosphere.api.java.tuple.Tuple7;
import eu.stratosphere.api.java.tuple.Tuple8;
import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
/**
* Object for storing serializable records in batch (single records are
* represented batches with one element) used for sending records between task
* objects in Stratosphere stream processing. The elements of the batch are
* Tuples.
*/
public class ArrayStreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
private Tuple[] tupleBatch;
private UID uid = new UID();
private int batchSize;
private TupleSerializer<Tuple> tupleSerializer;
SerializationDelegate<Tuple> serializationDelegate;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
/**
* Creates a new empty instance for read
*/
public ArrayStreamRecord() {
}
public ArrayStreamRecord(int batchsize) {
this.batchSize = batchsize;
tupleBatch = new Tuple[batchsize];
}
public ArrayStreamRecord(ArrayStreamRecord record) {
tupleBatch = new Tuple[record.batchSize];
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getBatchSize(); ++i) {
this.tupleBatch[i] = copyTuple(record.getTuple(i));
}
this.batchSize = tupleBatch.length;
}
/**
* Creates a new batch of records containing the given Tuple array as
* elements
*
* @param tupleList
* Tuples to bes stored in the StreamRecord
*/
public ArrayStreamRecord(Tuple[] tupleArray) {
this.batchSize = tupleArray.length;
tupleBatch = tupleArray;
}
public void setTupleTypeInfo(TupleTypeInfo<Tuple> typeInfo) {
tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();
serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
}
/**
* @return Number of tuples in the batch
*/
public int getBatchSize() {
return batchSize;
}
/**
* @return The ID of the object
*/
public UID getId() {
return uid;
}
/**
* Set the ID of the StreamRecord object
*
* @param channelID
* ID of the emitting task
* @return The StreamRecord object
*/
public ArrayStreamRecord setId(int channelID) {
uid = new UID(channelID);
return this;
}
/**
* Returns an iterable over the tuplebatch
*
* @return batch iterable
*/
public Iterable<Tuple> getBatchIterable() {
return (Iterable<Tuple>) Arrays.asList(tupleBatch);
}
/**
* @param tupleNumber
* Position of the record in the batch
* @return Chosen tuple
* @throws NoSuchTupleException
* the Tuple does not have this many fields
*/
public Tuple getTuple(int tupleNumber) throws NoSuchTupleException {
try {
return tupleBatch[tupleNumber];
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
}
/**
* Sets a tuple at the given position in the batch with the given tuple
*
* @param tupleNumber
* Position of tuple in the batch
* @param tuple
* Value to set
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
try {
tupleBatch[tupleNumber] = tuple;
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
}
/**
* Creates a deep copy of the StreamRecord
*
* @return Copy of the StreamRecord
*
*/
public ArrayStreamRecord copy() {
ArrayStreamRecord newRecord = new ArrayStreamRecord(batchSize);
newRecord.uid = new UID(Arrays.copyOf(uid.getId(), 20));
for (int i = 0; i < batchSize; i++) {
newRecord.tupleBatch[i] = copyTuple(tupleBatch[i]);
}
return newRecord;
}
/**
* Creates deep copy of Tuple
*
* @param tuple
* Tuple to copy
* @return Copy of the tuple
*/
public static Tuple copyTuple(Tuple tuple) {
// TODO: implement deep copy for arrays
int numofFields = tuple.getArity();
Tuple newTuple = null;
try {
newTuple = (Tuple) CLASSES[numofFields - 1].newInstance();
for (int i = 0; i < numofFields; i++) {
Class<? extends Object> type = tuple.getField(i).getClass();
if (type.isArray()) {
if (type.equals(Boolean[].class)) {
Boolean[] arr = (Boolean[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Byte[].class)) {
Byte[] arr = (Byte[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Character[].class)) {
Character[] arr = (Character[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Double[].class)) {
Double[] arr = (Double[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Float[].class)) {
Float[] arr = (Float[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Integer[].class)) {
Integer[] arr = (Integer[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Long[].class)) {
Long[] arr = (Long[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(Short[].class)) {
Short[] arr = (Short[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
} else if (type.equals(String[].class)) {
String[] arr = (String[]) tuple.getField(i);
newTuple.setField(Arrays.copyOf(arr, arr.length), i);
}
newTuple.setField(tuple.getField(i), i);
} else {
newTuple.setField(tuple.getField(i), i);
}
}
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return newTuple;
}
/**
* Creates a String representation as a list of tuples
*/
public String toString() {
StringBuilder outputString = new StringBuilder("[");
String prefix = "";
for (Tuple tuple : tupleBatch) {
outputString.append(prefix);
prefix = ",";
outputString.append(tuple.toString());
}
outputString.append("]");
return outputString.toString();
}
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
out.writeInt(batchSize);
for (Tuple tuple : tupleBatch) {
serializationDelegate.setInstance(tuple);
serializationDelegate.write(out);
}
}
/**
* Read method definition for the IOReadableWritable interface
*/
@Override
public void read(DataInput in) throws IOException {
uid = new UID();
uid.read(in);
batchSize = in.readInt();
tupleBatch = new Tuple[batchSize];
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
for (int k = 0; k < batchSize; ++k) {
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
tupleBatch[k] = dd.getInstance();
}
}
}
...@@ -15,15 +15,17 @@ ...@@ -15,15 +15,17 @@
package eu.stratosphere.streaming.examples.wordcount; package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable { public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>(); private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = ""; private String word = "";
private Integer count = 0; private Integer count = 0;
...@@ -45,7 +47,7 @@ public class WordCountCounter extends UserTaskInvokable { ...@@ -45,7 +47,7 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setInteger(1, count); outRecord.setInteger(1, count);
emit(outRecord); emit(outRecord);
performanceCounter.count(); // performanceCounter.count();
} }
......
...@@ -48,7 +48,7 @@ public class WordCountSourceSplitter extends UserSourceInvokable { ...@@ -48,7 +48,7 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
outRecord.setString(0, word); outRecord.setString(0, word);
System.out.println("word=" + word); System.out.println("word=" + word);
emit(outRecord); emit(outRecord);
performanceCounter.count(); //performanceCounter.count();
} }
} }
} }
......
package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1;
public class StreamCollectorTest {
@Test
public void testStreamCollector() {
StreamCollector collector = new StreamCollector(10, 0);
assertEquals(10, collector.batchSize);
}
@Test
public void testCollect() {
StreamCollector collector = new StreamCollector(2, 0);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
collector.collect(new Tuple1<Integer>(6));
}
@Test
public void testClose() {
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.types.TypeInformation;
public class ArrayStreamRecordTest {
@Test
public void constructorTest() {
ArrayStreamRecord record = new ArrayStreamRecord(10);
assertEquals(10, record.getBatchSize());
Tuple[] tuples = new Tuple[2];
tuples[0] = new Tuple1<Integer>(2);
tuples[1] = new Tuple1<Integer>(3);
ArrayStreamRecord record1 = new ArrayStreamRecord(tuples);
assertEquals(2, record1.getBatchSize());
ArrayStreamRecord record2 = new ArrayStreamRecord(record1);
assertEquals(2, record2.getBatchSize());
}
@Test
public void typeExtractTest() throws IOException, ClassNotFoundException {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(buff);
MyGeneric<?> g = new MyGeneric2();
out.writeObject(g);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buff.toByteArray()));
MyGeneric<?> f = (MyGeneric<?>) in.readObject();
TypeInformation<?> ti = TypeExtractor.createTypeInfo(MyGeneric.class, f.getClass(), 0,
null, null);
System.out.println("Type info: " + ti);
}
@Test
public void StreamRecordSpeedTest() {
int len = 1000000;
ArrayStreamRecord arecord = new ArrayStreamRecord(len);
StreamRecord record = new StreamRecord(len);
Tuple2<Integer, String> tuple = new Tuple2<Integer, String>(2, "a");
long standardTime=System.nanoTime();
for (int i = 0; i < len; i++) {
record.setTuple(i, tuple);
}
standardTime=System.nanoTime()-standardTime;
long arrayTime=System.nanoTime();
for (int i = 0; i < len; i++) {
arecord.setTuple(i, tuple);
}
arrayTime=System.nanoTime()-arrayTime;
System.out.println("Standard time: "+standardTime);
System.out.println("Array time: "+arrayTime);
float multi = (float)standardTime/(float)arrayTime;
System.out.println("Mulitplier: "+multi);
}
}
package eu.stratosphere.streaming.api.streamrecord;
import java.io.Serializable;
public abstract class MyGeneric<IN> implements Serializable {
public abstract void asd();
}
package eu.stratosphere.streaming.api.streamrecord;
public class MyGeneric2 extends MyGeneric<Integer>{
@Override
public void asd() {
// TODO Auto-generated method stub
}
}
...@@ -15,479 +15,60 @@ ...@@ -15,479 +15,60 @@
package eu.stratosphere.streaming.api.streamrecord; package eu.stratosphere.streaming.api.streamrecord;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.io.ObjectInputStream;
import java.util.LinkedList; import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.api.java.tuple.Tuple9;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.types.TypeInformation; import eu.stratosphere.types.TypeInformation;
public class StreamRecordTest { public class StreamRecordTest {
@Test @Test
public void singleRecordSetGetTest() { public void constructorTest() {
StreamRecord record = new StreamRecord( StreamRecord record = new StreamRecord(10);
new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>( assertEquals(10, record.getBatchSize());
"Stratosphere", 1, 2L, true, 3.5, (byte) 0xa, 'a', 0.1f, (short) 42));
assertEquals(9, record.getNumOfFields()); List<Tuple> tuples = new ArrayList<Tuple>();
assertEquals(1, record.getNumOfTuples()); tuples.add(new Tuple1<Integer>(2));
tuples.add(new Tuple1<Integer>(3));
assertEquals("Stratosphere", record.getString(0)); StreamRecord record1 = new StreamRecord(tuples);
assertEquals((Integer) 1, record.getInteger(1));
assertEquals((Long) 2L, record.getLong(2));
assertEquals(true, record.getBoolean(3));
assertEquals((Double) 3.5, record.getDouble(4));
assertEquals((Byte) (byte) 0xa, record.getByte(5));
assertEquals((Character) 'a', record.getCharacter(6));
assertEquals((Float) 0.1f, record.getFloat(7));
assertEquals((Short) (short) 42, record.getShort(8));
Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short> tuple = new Tuple9<String, Integer, Long, Boolean, Double, Byte, Character, Float, Short>(); assertEquals(2, record1.getBatchSize());
record.getTupleInto(tuple);
assertEquals("Stratosphere", tuple.getField(0));
assertEquals((Integer) 1, tuple.getField(1));
assertEquals((Long) 2L, tuple.getField(2));
assertEquals(true, tuple.getField(3));
assertEquals((Double) 3.5, tuple.getField(4));
assertEquals((Byte) (byte) 0xa, tuple.getField(5));
assertEquals((Character) 'a', tuple.getField(6));
assertEquals((Float) 0.1f, tuple.getField(7));
assertEquals((Short) (short) 42, tuple.getField(8));
record.setString(0, "Streaming");
record.setInteger(1, 2);
record.setLong(2, 3L);
record.setBoolean(3, false);
record.setDouble(4, 4.5);
record.setByte(5, (byte) 0xb);
record.setCharacter(6, 'b');
record.setFloat(7, 0.2f);
record.setShort(8, (short) 69);
assertEquals("Streaming", record.getString(0));
assertEquals((Integer) 2, record.getInteger(1));
assertEquals((Long) 3L, record.getLong(2));
assertEquals(false, record.getBoolean(3));
assertEquals((Double) 4.5, record.getDouble(4));
assertEquals((Byte) (byte) 0xb, record.getByte(5));
assertEquals((Character) 'b', record.getCharacter(6));
assertEquals((Float) 0.2f, record.getFloat(7));
assertEquals((Short) (short) 69, record.getShort(8));
record.setString(0, 0, "");
record.setInteger(0, 1, 0);
record.setLong(0, 2, 0L);
record.setBoolean(0, 3, false);
record.setDouble(0, 4, 0.);
record.setByte(0, 5, (byte) 0x0);
record.setCharacter(0, 6, '\0');
record.setFloat(0, 7, 0.f);
record.setShort(0, 8, (short) 0);
assertEquals("", record.getString(0, 0));
assertEquals((Integer) 0, record.getInteger(0, 1));
assertEquals((Long) 0L, record.getLong(0, 2));
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
assertEquals((Byte) (byte) 0x0, record.getByte(0, 5));
assertEquals((Character) '\0', record.getCharacter(0, 6));
assertEquals((Float) 0.f, record.getFloat(0, 7));
assertEquals((Short) (short) 0, record.getShort(0, 8));
}
@Test
public void batchRecordSetGetTest() {
StreamRecord record = new StreamRecord(5, 2);
Tuple5<String, Integer, Long, Boolean, Double> tuple = new Tuple5<String, Integer, Long, Boolean, Double>(
"Stratosphere", 1, 2L, true, 3.5);
record.addTuple(StreamRecord.copyTuple(tuple));
tuple.setField("", 0);
tuple.setField(0, 1);
tuple.setField(0L, 2);
tuple.setField(false, 3);
tuple.setField(0., 4);
record.addTuple(tuple);
try {
record.addTuple(new Tuple1<String>("4"));
fail();
} catch (TupleSizeMismatchException e) {
}
assertEquals(5, record.getNumOfFields());
assertEquals(2, record.getNumOfTuples());
assertEquals("Stratosphere", record.getString(0, 0));
assertEquals((Integer) 1, record.getInteger(0, 1));
assertEquals((Long) 2L, record.getLong(0, 2));
assertEquals(true, record.getBoolean(0, 3));
assertEquals((Double) 3.5, record.getDouble(0, 4));
assertEquals("", record.getString(1, 0));
assertEquals((Integer) 0, record.getInteger(1, 1));
assertEquals((Long) 0L, record.getLong(1, 2));
assertEquals(false, record.getBoolean(1, 3));
assertEquals((Double) 0., record.getDouble(1, 4));
record.setTuple(new Tuple5<String, Integer, Long, Boolean, Double>("", 0, 0L, false, 0.));
assertEquals(5, record.getNumOfFields());
assertEquals(2, record.getNumOfTuples());
assertEquals("", record.getString(0, 0));
assertEquals((Integer) 0, record.getInteger(0, 1));
assertEquals((Long) 0L, record.getLong(0, 2));
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1,
2L, true, 3.5));
assertEquals("Stratosphere", record.getString(1, 0));
assertEquals((Integer) 1, record.getInteger(1, 1));
assertEquals((Long) 2L, record.getLong(1, 2));
assertEquals(true, record.getBoolean(1, 3));
assertEquals((Double) 3.5, record.getDouble(1, 4));
record.removeTuple(1);
assertEquals(1, record.getNumOfTuples());
assertEquals("", record.getString(0, 0));
assertEquals((Integer) 0, record.getInteger(0, 1));
assertEquals((Long) 0L, record.getLong(0, 2));
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
record.addTuple(0, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1,
2L, true, 3.5));
assertEquals(2, record.getNumOfTuples());
assertEquals("Stratosphere", record.getString(0, 0));
assertEquals((Integer) 1, record.getInteger(0, 1));
assertEquals((Long) 2L, record.getLong(0, 2));
assertEquals(true, record.getBoolean(0, 3));
assertEquals((Double) 3.5, record.getDouble(0, 4));
}
@Test
public void copyTest() throws IOException {
StreamRecord a = new StreamRecord(new Tuple1<String>("Big"));
a.setId(0);
StreamRecord b = a.copy();
assertTrue(a.getField(0).equals(b.getField(0)));
assertTrue(a.getId().equals(b.getId()));
b.setId(2);
b.setTuple(new Tuple1<String>("Data"));
assertFalse(a.getId().equals(b.getId()));
assertFalse(a.getField(0).equals(b.getField(0)));
final int ITERATION = 10000;
StreamRecord c = new StreamRecord(new Tuple1<String>("Big"));
long t = System.nanoTime();
for (int i = 0; i < ITERATION; i++) {
c.copySerialized();
}
long t2 = System.nanoTime() - t;
System.out.println("Serialized copy:\t" + t2 + " ns");
t = System.nanoTime();
for (int i = 0; i < ITERATION; i++) {
c.copy();
}
t2 = System.nanoTime() - t;
System.out.println("Copy:\t" + t2 + " ns");
StreamRecord record2 = new StreamRecord(record1);
assertEquals(2, record2.getBatchSize());
} }
@Test @Test
public void exceptionTest() { public void typeExtractTest() throws IOException, ClassNotFoundException {
StreamRecord a = new StreamRecord(new Tuple1<String>("Big"));
try {
a.setTuple(4, new Tuple1<String>("Data"));
fail();
} catch (NoSuchTupleException e) {
}
try {
a.setTuple(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (TupleSizeMismatchException e) {
}
StreamRecord b = new StreamRecord();
try {
b.addTuple(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (TupleSizeMismatchException e) {
}
try {
a.getField(3);
fail();
} catch (NoSuchFieldException e) {
}
try {
a.getBoolean(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getByte(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getCharacter(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getDouble(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getFloat(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getInteger(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getLong(0);
fail();
} catch (FieldTypeMismatchException e) {}
try {
a.getShort(0);
fail();
} catch (FieldTypeMismatchException e) {}
StreamRecord c = new StreamRecord(new Tuple1<Integer>(1));
try {
c.getString(0);
fail();
} catch (FieldTypeMismatchException e) {}
}
@Test
public void writeReadTest() {
ByteArrayOutputStream buff = new ByteArrayOutputStream(); ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff); ObjectOutputStream out = new ObjectOutputStream(buff);
int num = 42; MyGeneric<?> g = new MyGeneric2();
String str = "above clouds"; out.writeObject(g);
Integer[] intArray = new Integer[] { 1, 2 };
Tuple3<Integer, String, Integer[]> tuple1 = new Tuple3<Integer, String, Integer[]>(num,
str, intArray);
Tuple3<Integer, String, Integer[]> tuple2 = new Tuple3<Integer, String, Integer[]>(1, "",
new Integer[] { 1, 2 });
StreamRecord rec = new StreamRecord(tuple1);
rec.addTuple(tuple2);
try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buff.toByteArray()));
rec.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord newRec = new StreamRecord(); MyGeneric<?> f = (MyGeneric<?>) in.readObject();
newRec.read(in);
assertEquals(2, newRec.getNumOfTuples()); TypeInformation<?> ti = TypeExtractor.createTypeInfo(MyGeneric.class, f.getClass(), 0,
null, null);
@SuppressWarnings("unchecked") System.out.println("Type info: " + ti);
Tuple3<Integer, String, Integer[]> tupleOut1 = (Tuple3<Integer, String, Integer[]>) newRec
.getTuple(0);
assertEquals(tupleOut1.getField(0), 42);
assertEquals(str, tupleOut1.getField(1));
assertArrayEquals(intArray, (Integer[]) tupleOut1.getField(2));
@SuppressWarnings("unchecked")
Tuple3<Integer, String, Integer[]> tupleOut2 = (Tuple3<Integer, String, Integer[]>) newRec
.getTuple(1);
assertEquals(tupleOut2.getField(0), 1);
assertEquals("", tupleOut2.getField(1));
assertArrayEquals(new Integer[] { 1, 2 }, (Integer[]) tupleOut2.getField(2));
} catch (IOException e) {
fail();
e.printStackTrace();
}
}
@Test
public void tupleCopyTest() {
Tuple3<String, Integer, Double[]> t1 = new Tuple3<String, Integer, Double[]>("a", 1,
new Double[] { 4.2 });
@SuppressWarnings("rawtypes")
Tuple3 t2 = (Tuple3) StreamRecord.copyTuple(t1);
assertEquals("a", t2.getField(0));
assertEquals(1, t2.getField(1));
assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
t1.setField(2, 1);
assertEquals(1, t2.getField(1));
assertEquals(2, t1.getField(1));
t1.setField(new Double[] { 3.14 }, 2);
assertArrayEquals(new Double[] { 3.14 }, (Double[]) t1.getField(2));
assertArrayEquals(new Double[] { 4.2 }, (Double[]) t2.getField(2));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
}
@Test
public void tupleArraySerializationTest() throws IOException {
Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]> t1 = new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
new Long[] { 12345678900l }, new Short[] { 12345 }, new String[] { "something" });
StreamRecord s1 = new StreamRecord(t1);
StreamRecord s2 = s1.copySerialized();
@SuppressWarnings("rawtypes")
Tuple9 t2 = (Tuple9) s2.getTuple();
assertArrayEquals(new Boolean[] { true }, (Boolean[]) t2.getField(0));
assertArrayEquals(new Byte[] { 12 }, (Byte[]) t2.getField(1));
assertArrayEquals(new Character[] { 'a' }, (Character[]) t2.getField(2));
assertArrayEquals(new Double[] { 12.5 }, (Double[]) t2.getField(3));
assertArrayEquals(new Float[] { 13.5f }, (Float[]) t2.getField(4));
assertArrayEquals(new Integer[] { 1234 }, (Integer[]) t2.getField(5));
assertArrayEquals(new Long[] { 12345678900l }, (Long[]) t2.getField(6));
assertArrayEquals(new Short[] { 12345 }, (Short[]) t2.getField(7));
assertArrayEquals(new String[] { "something" }, (String[]) t2.getField(8));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
assertEquals(t1.getField(2).getClass(), t2.getField(2).getClass());
assertEquals(t1.getField(3).getClass(), t2.getField(3).getClass());
assertEquals(t1.getField(4).getClass(), t2.getField(4).getClass());
assertEquals(t1.getField(5).getClass(), t2.getField(5).getClass());
assertEquals(t1.getField(6).getClass(), t2.getField(6).getClass());
assertEquals(t1.getField(7).getClass(), t2.getField(7).getClass());
assertEquals(t1.getField(8).getClass(), t2.getField(8).getClass());
} }
// TODO:measure performance of different serialization logics
@Test
public void typeCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord(
new Tuple9<Boolean, Byte, Character, Double, Float, Integer, Long, Short, String>(
(Boolean) true, (Byte) (byte) 12, (Character) 'a', (Double) 12.5,
(Float) (float) 13.5, (Integer) 1234, (Long) 12345678900l,
(Short) (short) 12345, "something"));
ByteArrayOutputStream buff3 = new ByteArrayOutputStream();
DataOutputStream out3 = new DataOutputStream(buff3);
for (int i = 0; i < 1000; i++) {
out3.write(rec.tupleTypesToByteArray(rec.getTuple()));
}
}
@Test
public void typeArrayCopyTest() throws NoSuchTupleException, IOException {
StreamRecord rec = new StreamRecord(
new Tuple9<Boolean[], Byte[], Character[], Double[], Float[], Integer[], Long[], Short[], String[]>(
new Boolean[] { true }, new Byte[] { 12 }, new Character[] { 'a' },
new Double[] { 12.5 }, new Float[] { 13.5f }, new Integer[] { 1234 },
new Long[] { 12345678900l }, new Short[] { 12345 },
new String[] { "something" }));
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
for (int i = 0; i < 10000; i++) {
out.write(rec.tupleTypesToByteArray(rec.getTuple()));
}
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord rec2 = new StreamRecord();
Long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
byte[] byteTypes = new byte[9];
in.read(byteTypes);
TypeInformation<?>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(basicTypes);
}
System.out.println("Type copy with ByteArray:\t" + (System.nanoTime() - start) + " ns");
start = System.nanoTime();
byte[] byteTypes = rec.tupleTypesToByteArray(rec.getTuple());
Tuple t = rec.getTuple();
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
// rec2.tupleBasicTypesFromByteArray(byteTypes, 9);
TypeInformation<?>[] basicTypes = rec2.tupleTypesFromByteArray(byteTypes);
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(basicTypes);
}
System.out.println("Write with infoArray:\t\t" + (System.nanoTime() - start) + " ns");
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
@SuppressWarnings("unused")
TupleTypeInfo<Tuple> typeInfo = (TupleTypeInfo<Tuple>) TypeExtractor.getForObject(t);
}
System.out.println("Write with extract:\t\t" + (System.nanoTime() - start) + " ns");
}
@Test
public void batchIteratorTest() {
List<Tuple> tupleList = new LinkedList<Tuple>();
tupleList.add(new Tuple1<String>("Big"));
tupleList.add(new Tuple1<String>("Data"));
StreamRecord a = new StreamRecord(tupleList);
assertEquals(2, a.getNumOfTuples());
assertEquals(1, a.getNumOfFields());
for (Tuple t : a.getBatchIterable()) {
System.out.println(t);
}
OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
ols.newSampleData(new double[] { 1.0, 2.0 }, new double[][] { { 1, 2 }, { 3, 4 } });
System.out.println(Arrays.toString(ols.estimateRegressionParameters()));
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册