提交 37a85a6b 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] refactored dummysources

上级 7251f31d
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -37,6 +37,7 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -51,7 +52,7 @@ import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
/**
* Object for building Flink stream processing job graphs
* Object for building Stratosphere stream processing job graphs
*/
public class JobGraphBuilder {
......@@ -88,16 +89,15 @@ public class JobGraphBuilder {
}
/**
* Creates a new JobGraph with the given parameters
* Creates a new JobGraph with the given name with fault tolerance turned
* off
*
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
*/
public JobGraphBuilder(String jobGraphName) {
this(jobGraphName, FaultToleranceType.NONE);
}
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
int defaultBatchSize, long defaultBatchTimeoutMillis) {
......@@ -107,118 +107,151 @@ public class JobGraphBuilder {
}
/**
* Adds source to the JobGraph with the given parameters
* Adds source to the JobGraph by user defined object and serialized
* operator
*
* @param sourceName
* Name of the component
* @param InvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* @param parallelism
* Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setSource(sourceName, InvokableObject, parallelism,
subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction) {
setSource(sourceName, InvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds source to the JobGraph by user defined object with the set
* parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public Configuration setSource(String sourceName,
UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(sourceName, InvokableObject, parallelism,
subtasksPerInstance, source);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
}
return config;
}
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setTask(taskName, TaskInvokableObject, parallelism,
subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction) {
setTask(taskName, TaskInvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds task to the JobGraph with the given parameters
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the component
* Name of the task component
* @param TaskInvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* User defined UserTaskInvokable object
* @param parallelism
* Number of parallel instances created
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks allocated to a machine
* @return
*/
public void setTask(String taskName,
public Configuration setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(taskName, TaskInvokableObject, parallelism,
subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
return config;
}
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setSink(sinkName, InvokableObject, parallelism, subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction) {
setSink(sinkName, InvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds sink to the JobGraph with the given parameters
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the component
* Name of the sink component
* @param InvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* User defined UserSinkInvokable object
* @param parallelism
* Number of parallel instances created
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
public Configuration setSink(String sinkName,
UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(sinkName, InvokableObject, parallelism,
subtasksPerInstance, sink);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
}
return config;
}
/**
* Sets component parameters in the JobGraph
* Sets JobVertex configuration based on the given parameters
*
* @param componentName
* Name of the component
* @param component
* The component vertex
* @param InvokableObject
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
* @param serializedFunction
* Serialized operator
* @param InvokableClass
* Class of the user defined Invokable
* @param parallelism
* Number of parallel instances created
* Number of subtasks
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks per instance
* @param component
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism, int subtasksPerInstance) {
private Configuration setComponent(String componentName,
final Class<? extends StreamComponent> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
......@@ -228,27 +261,49 @@ public class JobGraphBuilder {
}
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableObject.getClass());
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
config.setLong("batchTimeout", batchTimeout);
// config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
addSerializedObject(InvokableObject, config);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
return config;
}
private Configuration setComponent(String componentName,
UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
private Configuration setComponent(String componentName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
private Configuration setComponent(String componentName,
UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
/**
* Sets the number of tuples batched together for higher throughput
*
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
......@@ -260,11 +315,12 @@ public class JobGraphBuilder {
*
* @param InvokableObject
* Invokable object to serialize
* @param config
* JobVertex configuration to which the serialized invokable will
* be added
* @param component
* JobVertex to which the serialized invokable will be added
*/
private void addSerializedObject(Serializable InvokableObject, Configuration config) {
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
......@@ -283,31 +339,19 @@ public class JobGraphBuilder {
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* @param to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
* @param PartitionerClass
* Class of the partitioner
* @param channelType
* Channel Type
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
......@@ -351,7 +395,7 @@ public class JobGraphBuilder {
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
public void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
......@@ -439,13 +483,14 @@ public class JobGraphBuilder {
/**
* Connects two components with the given names by global partitioning.
* <p>
* Global partitioning: sends all emitted tuples to one output instance
* Global partitioning: sends all emitted records to one output instance
* (i.e. the first one)
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
......@@ -457,13 +502,14 @@ public class JobGraphBuilder {
/**
* Connects two components with the given names by shuffle partitioning.
* <p>
* Shuffle partitioning: sends the output tuples to a randomly selected
* Shuffle partitioning: sends the output records to a randomly selected
* channel
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
......@@ -471,13 +517,6 @@ public class JobGraphBuilder {
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
* Sets the number of instances for a given component, used for fault
* tolerance purposes
*
* @param upStreamComponentName
* @param numOfInstances
*/
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
......@@ -528,4 +567,12 @@ public class JobGraphBuilder {
return jobGraph;
}
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
}
......@@ -15,19 +15,21 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
......@@ -37,6 +39,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.MapTest.MyMap;
import eu.stratosphere.streaming.api.MapTest.MySink;
import eu.stratosphere.streaming.api.PrintTest.MyFlatMap;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -47,57 +50,193 @@ public class FlatMapTest {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0*value.f0));
public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0 * value.f0));
}
}
public static final class ParallelFlatMap extends
FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
numberOfElements++;
}
}
public static final class GenerateSequenceFlatMap extends
FlatMapFunction<Tuple1<Long>, Tuple1<Long>> {
@Override
public void flatMap(Tuple1<Long> value, Collector<Tuple1<Long>> out) throws Exception {
out.collect(new Tuple1<Long>(value.f0 * value.f0));
}
}
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
System.out.println("result " + tuple.f0);
}
}
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
public static final class FromElementsSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
public void invoke(Tuple1<Integer> tuple) {
fromElementsResult.add(tuple.f0);
}
}
public static final class FromCollectionSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
fromCollectionResult.add(tuple.f0);
}
}
public static final class GenerateSequenceSink extends SinkFunction<Tuple1<Long>> {
@Override
public void invoke(Tuple1<Long> tuple) {
generateSequenceResult.add(tuple.f0);
}
}
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
expected.add(i * i);
}
}
private static void fillFromElementsExpected() {
fromElementsExpected.add(4);
fromElementsExpected.add(25);
fromElementsExpected.add(81);
}
private static void fillSequenceSet() {
for (int i = 0; i < 10; i++) {
sequenceExpected.add(i * i);
}
}
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
private static void fillLongSequenceSet() {
for (int i = 0; i < 10; i++) {
sequenceLongExpected.add((long)(i * i));
}
}
private static void fillFromCollectionSet() {
if(fromCollectionSet.isEmpty()){
for (int i = 0; i < 10; i++) {
fromCollectionSet.add(i);
}
}
}
private static final int PARALELISM = 1;
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static int numberOfElements = 0;
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
private static Set<Integer> fromElementsExpected = new HashSet<Integer>();
private static Set<Integer> fromElementsResult = new HashSet<Integer>();
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static Set<Integer> sequenceExpected = new HashSet<Integer>();
private static Set<Long> sequenceLongExpected = new HashSet<Long>();
private static Set<Integer> fromCollectionResult = new HashSet<Integer>();
private static Set<Long> generateSequenceResult = new HashSet<Long>();
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment(2, 1000);
DataStream<Tuple1<Integer>> dataStream = env.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
.flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
env.execute();
fillExpectedList();
assertTrue(expected.equals(result));
}
@Test
public void parallelShuffleconnectTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> source = env.fromCollection(fromCollectionSet);
DataStream<Tuple1<Integer>> map = source.flatMap(new ParallelFlatMap(), 1).addSink(
new MySink());
DataStream<Tuple1<Integer>> map2 = source.flatMap(new ParallelFlatMap(), 1).addSink(
new MySink());
env.execute();
assertEquals(20, numberOfElements);
numberOfElements=0;
}
@Test
public void fromElementsTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> map = env.fromElements(2, 5, 9).flatMap(new MyFlatMap(), 1);
DataStream<Tuple1<Integer>> sink = map.addSink(new FromElementsSink());
fillFromElementsExpected();
env.execute();
assertEquals(fromElementsExpected, fromElementsResult);
}
@Test
public void fromCollectionTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> map = env.fromCollection(fromCollectionSet).flatMap(
new MyFlatMap(), 1);
DataStream<Tuple1<Integer>> sink = map.addSink(new FromCollectionSink());
fillSequenceSet();
env.execute();
assertEquals(sequenceExpected, fromCollectionResult);
}
@Test
public void generateSequenceTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Long>> map = env.generateSequence(0, 9).flatMap(
new GenerateSequenceFlatMap(), 1);
DataStream<Tuple1<Long>> sink = map.addSink(new GenerateSequenceSink());
fillLongSequenceSet();
env.execute();
assertEquals(sequenceLongExpected, generateSequenceResult);
}
}
......@@ -18,7 +18,9 @@ package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
......@@ -33,27 +35,6 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("source "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MyFieldsSource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < MAXSOURCE; i++) {
collector.collect(new Tuple1<Integer>(5));
}
}
}
public static final class MyDiffFieldsSource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 9; i++) {
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -63,7 +44,6 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
System.out.println("mymap "+map);
map++;
return new Tuple1<Integer>(value.f0 * value.f0);
}
......@@ -76,7 +56,6 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
counter++;
if (counter == MAXSOURCE)
allInOne = true;
return new Tuple1<Integer>(value.f0 * value.f0);
......@@ -140,13 +119,12 @@ public class MapTest {
@Override
public void invoke(Tuple1<Integer> tuple) {
System.out.println("sink "+graphResult);
graphResult++;
}
}
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
private static int broadcastResult = 0;
private static int shuffleResult = 0;
private static int fieldsResult = 0;
......@@ -157,19 +135,49 @@ public class MapTest {
private static final int MAXSOURCE = 10;
private static boolean allInOne = false;
private static boolean threeInAll = true;
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static List<Integer> fromCollectionFields = new ArrayList<Integer>();
private static Set<Integer> fromCollectionDiffFieldsSet = new HashSet<Integer>();
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
expected.add(i * i);
}
}
private static void fillFromCollectionSet() {
if(fromCollectionSet.isEmpty()){
for (int i = 0; i < 10; i++) {
fromCollectionSet.add(i);
}
}
}
private static void fillFromCollectionFieldsSet() {
if(fromCollectionFields.isEmpty()){
for (int i = 0; i < MAXSOURCE; i++) {
fromCollectionFields.add(5);
}
}
}
private static void fillFromCollectionDiffFieldsSet() {
if(fromCollectionDiffFieldsSet.isEmpty()){
for (int i = 0; i < 9; i++) {
fromCollectionDiffFieldsSet.add(i);
}
}
}
@Test
public void mapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> dataStream = env.addSource(new MySource(), 1)
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
.map(new MyMap(), PARALELISM).addSink(new MySink());
env.execute();
......@@ -182,8 +190,11 @@ public class MapTest {
@Test
public void broadcastSinkTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MySource(), 1)
.fromCollection(fromCollectionSet)
.broadcast()
.map(new MyMap(), 3)
.addSink(new MyBroadcastSink());
......@@ -196,8 +207,11 @@ public class MapTest {
@Test
public void shuffleSinkTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MySource(), 1)
.fromCollection(fromCollectionSet)
.map(new MyMap(), 3)
.addSink(new MyShufflesSink());
env.execute();
......@@ -222,8 +236,11 @@ public class MapTest {
@Test
public void fieldsMapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionFieldsSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MyFieldsSource(), 1)
.fromCollection(fromCollectionFields)
.partitionBy(0)
.map(new MyFieldsMap(), 3)
.addSink(new MyFieldsSink());
......@@ -236,8 +253,11 @@ public class MapTest {
@Test
public void diffFieldsMapTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionDiffFieldsSet();
DataStream<Tuple1<Integer>> dataStream = env
.addSource(new MyDiffFieldsSource(), 1)
.fromCollection(fromCollectionDiffFieldsSet)
.partitionBy(0)
.map(new MyDiffFieldsMap(), 3)
.addSink(new MyDiffFieldsSink());
......
......@@ -29,8 +29,6 @@ public class PrintTest {
public static final class MyFlatMap extends
FlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
throws Exception {
......@@ -53,4 +51,6 @@ public class PrintTest {
env.execute();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册