提交 3d05ef47 编写于 作者: J judit 提交者: Stephan Ewen

[streaming] new tests in MapTest

上级 e31a9813
......@@ -94,12 +94,6 @@
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -54,7 +54,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
@Override
public void collect(T tuple) {
//TODO: move copy to StreamCollector2
streamRecord.setTuple(counter, tuple);
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
if (counter >= batchSize) {
......
......@@ -65,17 +65,15 @@ public class StreamCollector2<T extends Tuple> implements Collector<T> {
// TODO copy here instead of copying inside every StreamCollector
@Override
public void collect(T tuple) {
T copiedTuple = StreamRecord.copyTuple(tuple);
public void collect(T record) {
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(copiedTuple);
collector.collect(record);
}
int partitionHash = Math.abs(copiedTuple.getField(keyPostition).hashCode());
int partitionHash = Math.abs(record.getField(keyPostition).hashCode());
for (StreamCollector<Tuple>[] collectors : partitionedCollectors) {
collectors[partitionHash % collectors.length].collect(copiedTuple);
collectors[partitionHash % collectors.length].collect(record);
}
}
......
......@@ -53,7 +53,7 @@ public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
@Override
public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
long progress = (Long) value.getField(windowFieldId);
long progress = value.getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
......
......@@ -147,12 +147,12 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable {
* Tuple to copy
* @return Copy of the tuple
*/
public static <T extends Tuple> T copyTuple(T tuple) {
public static Tuple copyTuple(Tuple tuple) {
// TODO: implement deep copy for arrays
int numofFields = tuple.getArity();
T newTuple = null;
Tuple newTuple = null;
try {
newTuple = (T) CLASSES[numofFields - 1].newInstance();
newTuple = (Tuple) CLASSES[numofFields - 1].newInstance();
for (int i = 0; i < numofFields; i++) {
Class<? extends Object> type = tuple.getField(i).getClass();
......
......@@ -20,6 +20,8 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
......@@ -33,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.MapTest.MyMap;
import eu.stratosphere.streaming.api.MapTest.MySink;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -40,129 +44,61 @@ import eu.stratosphere.util.Collector;
public class FlatMapTest {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
out.collect(value);
System.out.println("flatMap");
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0*value.f0));
}
}
public static final class MySink extends SinkFunction<Tuple1<String>> {
int c=0;
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<String> tuple) {
System.out.println(tuple);
c++;
System.out.println(c);
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
System.out.println("result " + tuple.f0);
}
}
public static final class MySource extends SourceFunction<Tuple1<String>> {
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<String>> collector) {
for (int i = 0; i < 5; i++) {
collector.collect(new Tuple1<String>("hi"));
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
}
}
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
System.out.println("expected " + i*i);
}
}
private static final int PARALELISM = 2;
private static final int PARALELISM = 1;
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
@Test
public void test() throws Exception {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000);
fail();
} catch (IllegalArgumentException e) {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0);
fail();
} catch (IllegalArgumentException e2) {
}
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000);
DataStream<Tuple1<String>> dataStream0 = context.addSource(new MySource(),1);
DataStream<Tuple1<Integer>> dataStream0 = context.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
DataStream<Tuple1<String>> dataStream1 = context.addDummySource().connectWith(dataStream0)
.partitionBy(0).flatMap(new MyFlatMap(), PARALELISM).broadcast().addSink(new MySink());
context.execute();
JobGraphBuilder jgb = context.jobGB();
for (AbstractJobVertex c : jgb.components.values()) {
if (c instanceof JobTaskVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
f.flatMap(t, s);
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
FlatMapFunction.class, f.getClass(), 0, null, null);
System.out.println(ts);
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
if (c instanceof JobOutputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
SinkFunction.class, f.getClass(), 0, null, null);
System.out.println(ts);
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
if (c instanceof JobInputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, f.getClass(), 0, null, null);
System.out.println(ts);
System.out.println("----------------");
}
}
fillExpectedList();
assertTrue(expected.equals(result));
}
}
......@@ -15,123 +15,163 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import org.jblas.util.Random;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
import eu.stratosphere.util.Collector;
public class MapTest {
public static final class MyMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
@Override
public Tuple1<String> map(Tuple1<String> value) throws Exception {
System.out.println("map");
return value;
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MyFieldsSource extends SourceFunction<Tuple1<Integer>> {
private static final int PARALELISM = 1;
@Test
public void test() throws Exception {
Tuple1<String> tup = new Tuple1<String>("asd");
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream = context.addDummySource().map(new MyMap(), PARALELISM)
.addDummySink();
context.execute();
JobGraphBuilder jgb = context.jobGB();
for (AbstractJobVertex c : jgb.components.values()) {
if (c instanceof JobTaskVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
MapFunction<Tuple, Tuple> f = (MapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
s.collect(f.map(t));
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
MapFunction.class, f.getClass(), 0, null, null);
System.out.println(ts);
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
@Override
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<MAXSOURCE; i++){
collector.collect(new Tuple1<Integer>(5));
}
}
}
public static final class MyMap extends MapFunction<Tuple1<Integer>,Tuple1<Integer>> {
if (c instanceof JobOutputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
// TODO Auto-generated method stub
return new Tuple1<Integer>(value.f0*value.f0);
}
}
public static final class MyFieldsMap extends MapFunction<Tuple1<Integer>,Tuple1<Integer>> {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
private int counter=0;
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
// TODO Auto-generated method stub
counter++;
if(counter==MAXSOURCE) allInOne=true;
return new Tuple1<Integer>(value.f0*value.f0);
}
}
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
SinkFunction.class, f.getClass(), 0, null, null);
@Override
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
//System.out.println("result " + tuple.f0);
}
}
public static final class MyBroadcastSink extends SinkFunction<Tuple1<Integer>> {
System.out.println(ts);
@Override
public void invoke(Tuple1<Integer> tuple) {
broadcastResult++;
}
}
public static final class MyFieldsSink extends SinkFunction<Tuple1<Integer>> {
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
@Override
public void invoke(Tuple1<Integer> tuple) {
fieldsResult++;
}
}
if (c instanceof JobInputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static int broadcastResult = 0;
private static int fieldsResult = 0;
private static int fieldsCounter = 0;
private static final int PARALELISM = 1;
private static final int MAXSOURCE = 10;
private static boolean allInOne=false;
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
//System.out.println("expected " + i*i);
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
DataStream<Tuple1<Integer>> dataStream = context
.addSource(new MySource(), 1)
.map(new MyMap(), PARALELISM)
.addSink(new MySink());
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, f.getClass(), 0, null, null);
context.execute();
fillExpectedList();
System.out.println(ts);
System.out.println("----------------");
}
}
assertTrue(expected.equals(result));
}
@Test
public void broadcastTest() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> dataStream = context
.addSource(new MySource(), 1)
.broadcast()
.map(new MyMap(), 3)
.addSink(new MyBroadcastSink());
context.execute();
assertEquals(30, broadcastResult);
}
@Test
public void fieldsSinkTest() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> dataStream = context
.addSource(new MySource(), 1)
.partitionBy(0)
.map(new MyMap(), 3)
.addSink(new MyFieldsSink());
context.execute();
assertEquals(10, fieldsResult);
}
@Test
public void fieldsMapTest() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> dataStream = context
.addSource(new MyFieldsSource(), 1)
.partitionBy(0)
.map(new MyFieldsMap(), 3)
.addSink(new MyFieldsSink());
context.execute();
assertTrue(allInOne);
}
}
......@@ -15,8 +15,6 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
......@@ -25,20 +23,18 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollector2Test {
StreamCollector2<Tuple> collector;
@Test
public void testCollect() {
List<Integer> batchSizesOfNotPartitioned = new ArrayList<Integer>();
List<Integer> batchSizesOfPartitioned = new ArrayList<Integer>();
batchSizesOfPartitioned.add(2);
batchSizesOfPartitioned.add(3);
batchSizesOfPartitioned.add(2);
List<Integer> parallelismOfOutput = new ArrayList<Integer>();
parallelismOfOutput.add(2);
parallelismOfOutput.add(2);
......@@ -48,11 +44,8 @@ public class StreamCollector2Test {
List<RecordWriter<StreamRecord>> fOut = new ArrayList<RecordWriter<StreamRecord>>();
MockRecordWriter rw1 = MockRecordWriterFactory.create();
MockRecordWriter rw2 = MockRecordWriterFactory.create();
fOut.add(rw1);
fOut.add(rw2);
fOut.add(null);
fOut.add(null);
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
......@@ -62,28 +55,15 @@ public class StreamCollector2Test {
t.f0 = 0;
collector.collect(t);
t.f0 = 1;
collector.collect(t);
collector.collect(t);
t.f0 = 0;
collector.collect(t);
StreamRecord r1 = rw1.emittedRecords.get(0);
assertEquals(1, rw1.emittedRecords.size());
assertEquals(0, r1.getTuple(0).getField(0));
assertEquals(0, r1.getTuple(1).getField(0));
t.f0 = 1;
collector.collect(t);
StreamRecord r2 = rw1.emittedRecords.get(1);
assertEquals(2, rw1.emittedRecords.size());
assertEquals(1, r2.getTuple(0).getField(0));
assertEquals(1, r2.getTuple(1).getField(0));
assertEquals(0, rw2.emittedRecords.size());
t.f0 = 5;
collector.collect(t);
assertEquals(2, rw1.emittedRecords.size());
assertEquals(1, rw2.emittedRecords.size());
}
@Test
public void testClose() {
}
}
......@@ -15,20 +15,11 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import static org.junit.Assert.*;
import org.junit.Test;
import org.mockito.Mockito;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollectorTest {
......@@ -55,30 +46,13 @@ public class StreamCollectorTest {
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
Thread.sleep(200);
collector.collect(new Tuple1<Integer>(2));
collector.collect(new Tuple1<Integer>(3));
System.out.println("---------------");
}
@Test
public void recordWriter() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
ArrayList<RecordWriter<StreamRecord>> rwList = new ArrayList<RecordWriter<StreamRecord>>();
rwList.add(recWriter);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
collector.collect(new Tuple1<Integer>(6));
assertEquals((Integer) 3, recWriter.emittedRecords.get(0).getTuple(0).getField(0));
assertEquals((Integer) 6, recWriter.emittedRecords.get(1).getTuple(1).getField(0));
}
@Test
public void testClose() {
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.ArrayList;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class MockRecordWriter extends RecordWriter<StreamRecord> {
public ArrayList<StreamRecord> emittedRecords;
public MockRecordWriter(AbstractInputTask<?> inputBase, Class<StreamRecord> outputClass) {
super(inputBase, outputClass);
}
public boolean initList() {
emittedRecords = new ArrayList<StreamRecord>();
return true;
}
@Override
public void emit(StreamRecord record) {
emittedRecords.add(record.copy());
}
}
\ No newline at end of file
package eu.stratosphere.streaming.util;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import org.mockito.Mockito;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class MockRecordWriterFactory {
public static MockRecordWriter create() {
MockRecordWriter recWriter = mock(MockRecordWriter.class);
Mockito.when(recWriter.initList()).thenCallRealMethod();
doCallRealMethod().when(recWriter).emit(Mockito.any(StreamRecord.class));
recWriter.initList();
return recWriter;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册