/*********************************************************************************************************************** * * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ package eu.stratosphere.streaming.api; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import org.junit.Test; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.AbstractJobVertex; import eu.stratosphere.nephele.jobgraph.JobInputVertex; import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent; import eu.stratosphere.util.Collector; public class MapTest { public static final class MyMap extends MapFunction, Tuple1> { @Override public Tuple1 map(Tuple1 value) throws Exception { System.out.println("map"); return value; } } @Test public void test() throws Exception { Tuple1 tup = new Tuple1("asd"); StreamExecutionEnvironment context = new StreamExecutionEnvironment(); DataStream> dataStream = context.addDummySource().map(new MyMap()) .addDummySink(); context.execute(); JobGraphBuilder jgb = context.jobGB(); for (AbstractJobVertex c : jgb.components.values()) { if (c instanceof JobTaskVertex) { Configuration config = c.getConfiguration(); System.out.println(config.getString("componentName", "default")); byte[] bytes = config.getBytes("operator", null); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); MapFunction f = (MapFunction) in.readObject(); StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); s.collect(f.map(t)); System.out.println(f.getClass().getGenericSuperclass()); TupleTypeInfo ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( MapFunction.class, f.getClass(), 0, null, null); System.out.println(ts); byte[] userFunctionSerialized = config.getBytes("serializedudf", null); in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized)); UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject(); System.out.println(userFunction.getClass()); assertTrue(true); System.out.println("----------------"); } if (c instanceof JobOutputVertex) { Configuration config = c.getConfiguration(); System.out.println(config.getString("componentName", "default")); byte[] bytes = config.getBytes("operator", null); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); SinkFunction f = (SinkFunction) in.readObject(); System.out.println(f.getClass().getGenericSuperclass()); TupleTypeInfo ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( SinkFunction.class, f.getClass(), 0, null, null); System.out.println(ts); byte[] userFunctionSerialized = config.getBytes("serializedudf", null); in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized)); UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject(); System.out.println(userFunction.getClass()); assertTrue(true); System.out.println("----------------"); } if (c instanceof JobInputVertex) { Configuration config = c.getConfiguration(); System.out.println(config.getString("componentName", "default")); byte[] bytes = config.getBytes("operator", null); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); UserSourceInvokable f = (UserSourceInvokable) in.readObject(); System.out.println(f.getClass().getGenericSuperclass()); TupleTypeInfo ts = (TupleTypeInfo) TypeExtractor.createTypeInfo( UserSourceInvokable.class, f.getClass(), 0, null, null); System.out.println(ts); System.out.println("----------------"); } } } }