提交 f0c57317 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] RMQ source for multiple types

上级 df9dac28
...@@ -5,42 +5,18 @@ ...@@ -5,42 +5,18 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<version>0.3-SNAPSHOT</version> <version>0.2-SNAPSHOT</version>
<artifactId>stratosphere-streaming</artifactId> <artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name> <name>stratosphere-streaming</name>
<packaging>pom</packaging>
<url>http://github.com/stratosphere/stratosphere-streaming</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>stratosphere</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/stratosphere/stratosphere-streaming</url>
<connection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</connection>
<developerConnection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</developerConnection>
</scm>
<developers> <packaging>jar</packaging>
</developers>
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
</modules>
<properties> <properties>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version> <stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties> </properties>
<repositories> <repositories>
<repository> <repository>
<id>dms-repo</id> <id>dms-repo</id>
...@@ -49,54 +25,47 @@ ...@@ -49,54 +25,47 @@
<snapshots><enabled>true</enabled></snapshots> <snapshots><enabled>true</enabled></snapshots>
</repository> </repository>
</repositories> </repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId> <artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId> <artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId> <artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId> <artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId> <artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId> <artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.7</version> <version>4.7</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.3.2</version> <version>3.3.2</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
...@@ -109,14 +78,27 @@ ...@@ -109,14 +78,27 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>1.2.16</version> <version>1.2.16</version>
<type>jar</type> </dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<version>1.8.5</version> <version>1.8.5</version>
<scope>test</scope> <scope>test</scope>
<type>jar</type>
</dependency> </dependency>
</dependencies> </dependencies>
......
...@@ -12,12 +12,13 @@ ...@@ -12,12 +12,13 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api;
import java.util.Iterator; import java.util.Iterator;
import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
...@@ -31,7 +32,6 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U ...@@ -31,7 +32,6 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U
@Override @Override
public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception { public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception {
@SuppressWarnings("unchecked")
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator(); Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector); reducer.reduce(iterator, collector);
} }
......
...@@ -24,11 +24,6 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction; ...@@ -24,11 +24,6 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType; import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.invokable.BatchReduceInvokable;
import eu.stratosphere.streaming.api.invokable.FilterInvokable;
import eu.stratosphere.streaming.api.invokable.FlatMapInvokable;
import eu.stratosphere.streaming.api.invokable.MapInvokable;
import eu.stratosphere.types.TypeInformation; import eu.stratosphere.types.TypeInformation;
public class DataStream<T extends Tuple> { public class DataStream<T extends Tuple> {
...@@ -198,29 +193,6 @@ public class DataStream<T extends Tuple> { ...@@ -198,29 +193,6 @@ public class DataStream<T extends Tuple> {
return returnStream; return returnStream;
} }
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
* returns exactly one element.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param parallelism
* The number of threads the function runs on.
* @param <R>
* output type
* @return The transformed DataStream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
parallelism);
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return map(mapper, 1);
}
/** /**
* Applies a FlatMap transformation on a DataStream. The transformation * Applies a FlatMap transformation on a DataStream. The transformation
* calls a FlatMapFunction for each element of the DataSet. Each * calls a FlatMapFunction for each element of the DataSet. Each
...@@ -241,30 +213,23 @@ public class DataStream<T extends Tuple> { ...@@ -241,30 +213,23 @@ public class DataStream<T extends Tuple> {
new FlatMapInvokable<T, R>(flatMapper), parallelism); new FlatMapInvokable<T, R>(flatMapper), parallelism);
} }
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return flatMap(flatMapper, 1);
}
/** /**
* Applies a Filter transformation on a DataStream. The transformation calls * Applies a Map transformation on a DataStream. The transformation calls a
* a FilterFunction for each element of the DataStream and retains only * MapFunction for each element of the DataStream. Each MapFunction call
* those element for which the function returns true. Elements for which the * returns exactly one element.
* function returns false are filtered.
* *
* @param filter * @param mapper
* The FilterFunction that is called for each element of the * The MapFunction that is called for each element of the
* DataSet. * DataStream.
* @param parallelism * @param parallelism
* The number of threads the function runs on. * The number of threads the function runs on.
* @return The filtered DataStream. * @param <R>
* output type
* @return The transformed DataStream.
*/ */
public DataStream<T> filter(FilterFunction<T> filter, int parallelism) { public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("filter", this.copy(), filter, return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
new FilterInvokable<T>(filter), parallelism); parallelism);
}
public DataStream<T> filter(FilterFunction<T> filter) {
return filter(filter, 1);
} }
/** /**
...@@ -290,9 +255,22 @@ public class DataStream<T extends Tuple> { ...@@ -290,9 +255,22 @@ public class DataStream<T extends Tuple> {
new BatchReduceInvokable<T, R>(reducer), parallelism); new BatchReduceInvokable<T, R>(reducer), parallelism);
} }
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer, /**
int batchSize) { * Applies a Filter transformation on a DataStream. The transformation calls
return batchReduce(reducer, batchSize, 1); * a FilterFunction for each element of the DataStream and retains only
* those element for which the function returns true. Elements for which the
* function returns false are filtered.
*
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* @param parallelism
* The number of threads the function runs on.
* @return The filtered DataStream.
*/
public DataStream<T> filter(FilterFunction<T> filter, int parallelism) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter), parallelism);
} }
/** /**
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.function; package eu.stratosphere.streaming.api;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.function; package eu.stratosphere.streaming.api;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
......
...@@ -12,27 +12,24 @@ ...@@ -12,27 +12,24 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.functions.FilterFunction; import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> { public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
FilterFunction<IN> filterFunction; FilterFunction<IN> filterFunction;
public FilterInvokable(FilterFunction<IN> filterFunction) { public FilterInvokable(FilterFunction<IN> filterFunction) {
this.filterFunction = filterFunction; this.filterFunction = filterFunction;
} }
@Override @Override
public void invoke(StreamRecord record, Collector<IN> collector) throws Exception { public void invoke(StreamRecord record, Collector<IN> collector) throws Exception {
for (int i = 0; i < record.getBatchSize(); i++) { for (int i = 0; i < record.getBatchSize(); i++) {
@SuppressWarnings("unchecked")
IN tuple = (IN) record.getTuple(i); IN tuple = (IN) record.getTuple(i);
if (filterFunction.filter(tuple)) { if (filterFunction.filter(tuple)) {
collector.collect(tuple); collector.collect(tuple);
......
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -28,6 +28,8 @@ import org.apache.commons.logging.LogFactory; ...@@ -28,6 +28,8 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex; import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
...@@ -35,8 +37,6 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex; ...@@ -35,8 +37,6 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig; import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
...@@ -63,8 +63,8 @@ public class JobGraphBuilder { ...@@ -63,8 +63,8 @@ public class JobGraphBuilder {
protected String maxParallelismVertexName; protected String maxParallelismVertexName;
protected int maxParallelism; protected int maxParallelism;
protected FaultToleranceType faultToleranceType; protected FaultToleranceType faultToleranceType;
private int batchSize = 1; private int batchSize;
private long batchTimeout = 1000; private long batchTimeout;
/** /**
* Creates a new JobGraph with the given name * Creates a new JobGraph with the given name
...@@ -87,12 +87,24 @@ public class JobGraphBuilder { ...@@ -87,12 +87,24 @@ public class JobGraphBuilder {
this.faultToleranceType = faultToleranceType; this.faultToleranceType = faultToleranceType;
} }
public void setDefaultBatchSize(int batchSize) { /**
this.batchSize = batchSize; * Creates a new JobGraph with the given parameters
} *
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
* defaultBatchTimeoutMillis
*/
public void setBatchTimeout(int timeout) { public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
this.batchTimeout = timeout; int defaultBatchSize, long defaultBatchTimeoutMillis) {
this(jobGraphName, faultToleranceType);
this.batchSize = defaultBatchSize;
this.batchTimeout = defaultBatchTimeoutMillis;
} }
/** /**
...@@ -108,16 +120,18 @@ public class JobGraphBuilder { ...@@ -108,16 +120,18 @@ public class JobGraphBuilder {
* Serialized udf * Serialized udf
* @param parallelism * @param parallelism
* Number of parallel instances created * Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/ */
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject, public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) { String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInvokableClass(StreamSource.class); source.setInputClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction, setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism); parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName); log.debug("SOURCE: " + sourceName);
...@@ -137,15 +151,17 @@ public class JobGraphBuilder { ...@@ -137,15 +151,17 @@ public class JobGraphBuilder {
* Serialized udf * Serialized udf
* @param parallelism * @param parallelism
* Number of parallel instances created * Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/ */
public void setTask(String taskName, public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject, UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) { String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setInvokableClass(StreamTask.class); task.setTaskClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction, setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism); parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName); log.debug("TASK: " + taskName);
...@@ -165,13 +181,16 @@ public class JobGraphBuilder { ...@@ -165,13 +181,16 @@ public class JobGraphBuilder {
* Serialized udf * Serialized udf
* @param parallelism * @param parallelism
* Number of parallel instances created * Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/ */
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject, public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) { String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setInvokableClass(StreamSink.class); sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, parallelism); setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName); log.debug("SINK: " + sinkName);
...@@ -199,11 +218,10 @@ public class JobGraphBuilder { ...@@ -199,11 +218,10 @@ public class JobGraphBuilder {
*/ */
private void setComponent(String componentName, AbstractJobVertex component, private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction, Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism) { int parallelism, int subtasksPerInstance) {
component.setNumberOfSubtasks(parallelism); component.setNumberOfSubtasks(parallelism);
// TODO remove all NumberOfSubtasks setting component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
// component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) { if (parallelism > maxParallelism) {
maxParallelism = parallelism; maxParallelism = parallelism;
...@@ -224,6 +242,20 @@ public class JobGraphBuilder { ...@@ -224,6 +242,20 @@ public class JobGraphBuilder {
numberOfInstances.put(componentName, parallelism); numberOfInstances.put(componentName, parallelism);
} }
/**
* Sets the number of tuples batched together for higher throughput
*
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
}
/** /**
* Adds serialized invokable object to the JobVertex configuration * Adds serialized invokable object to the JobVertex configuration
* *
...@@ -234,36 +266,105 @@ public class JobGraphBuilder { ...@@ -234,36 +266,105 @@ public class JobGraphBuilder {
* be added * be added
*/ */
private void addSerializedObject(Serializable InvokableObject, Configuration config) { private void addSerializedObject(Serializable InvokableObject, Configuration config) {
ByteArrayOutputStream baos = null; ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null; ObjectOutputStream oos = null;
try { try {
baos = new ByteArrayOutputStream(); baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos); oos = new ObjectOutputStream(baos);
oos.writeObject(InvokableObject); oos.writeObject(InvokableObject);
config.setBytes("serializedudf", baos.toByteArray()); config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass()); System.out.println("Serialization error " + InvokableObject.getClass());
} }
} }
/** /**
* Sets the number of tuples batched together for higher throughput * Sets udf operator from one component to another, used with some sinks.
* *
* @param componentName * @param from
* Name of the component * from
* @param batchSize * @param to
* Number of tuples batched together * to
*/ */
public void setBatchSize(String componentName, int batchSize) { public void setBytesFrom(String from, String to) {
Configuration config = components.get(componentName).getConfiguration(); Configuration fromConfig = components.get(from).getConfiguration();
config.setInteger("batchSize_" Configuration toConfig = components.get(to).getConfiguration();
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
} }
/** /**
...@@ -301,28 +402,28 @@ public class JobGraphBuilder { ...@@ -301,28 +402,28 @@ public class JobGraphBuilder {
*/ */
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
int keyPosition) { int keyPosition) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName); AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName); AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try { try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK); upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration()) Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration(); .getConfiguration();
config.setClass( config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1), "partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class); FieldsPartitioner.class);
config.setInteger( config.setInteger(
"partitionerIntParam_" "partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition); + (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger("numOfOutputs_" config.setInteger("numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), + (upStreamComponent.getNumberOfForwardConnections() - 1),
numberOfInstances.get(downStreamComponentName)); numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1); addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> " log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
...@@ -336,7 +437,7 @@ public class JobGraphBuilder { ...@@ -336,7 +437,7 @@ public class JobGraphBuilder {
} }
log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName
+ " on " + keyPosition); + " on " + keyPosition);
} }
/** /**
...@@ -354,7 +455,7 @@ public class JobGraphBuilder { ...@@ -354,7 +455,7 @@ public class JobGraphBuilder {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class); connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
addOutputChannels(upStreamComponentName, 1); addOutputChannels(upStreamComponentName, 1);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName); log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
} }
/** /**
...@@ -374,89 +475,6 @@ public class JobGraphBuilder { ...@@ -374,89 +475,6 @@ public class JobGraphBuilder {
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName); log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
} }
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* from
* @param to
* to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
}
/** /**
* Sets the number of instances for a given component, used for fault * Sets the number of instances for a given component, used for fault
* tolerance purposes * tolerance purposes
......
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.function; package eu.stratosphere.streaming.api;
import java.io.Serializable; import java.io.Serializable;
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.function; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
...@@ -13,11 +13,13 @@ ...@@ -13,11 +13,13 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter; import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class StreamCollector<T extends Tuple> implements Collector<T> { public class StreamCollector<T extends Tuple> implements Collector<T> {
......
...@@ -13,14 +13,15 @@ ...@@ -13,14 +13,15 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamrecord; package eu.stratosphere.streaming.api;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class StreamCollectorManager<T extends Tuple> implements Collector<T> { public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
...@@ -53,7 +54,6 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> { ...@@ -53,7 +54,6 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
} }
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
@SuppressWarnings("unchecked")
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)]; StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) { for (int j = 0; j < collectors.length; j++) {
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i), collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
......
...@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.api.invokable; ...@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable; import java.io.Serializable;
public abstract class StreamComponentInvokable implements Serializable { public abstract class StreamComponent implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -20,10 +20,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; ...@@ -20,10 +20,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponentInvokable { StreamComponent {
private static final long serialVersionUID = 1L;
public abstract void invoke(StreamRecord record, Collector<OUT> collector) public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception; throws Exception;
} }
...@@ -20,7 +20,7 @@ import java.io.Serializable; ...@@ -20,7 +20,7 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponentInvokable implements public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponent implements
Serializable { Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -16,8 +16,10 @@ ...@@ -16,8 +16,10 @@
package eu.stratosphere.streaming.api.streamcomponent; package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
...@@ -33,26 +35,39 @@ import eu.stratosphere.api.java.typeutils.TupleTypeInfo; ...@@ -33,26 +35,39 @@ import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader; import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.runtime.io.api.ChannelSelector; import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.runtime.io.api.MutableRecordReader; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.runtime.io.api.RecordWriter; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamComponentInvokable; import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable; import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord; import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamCollectorManager;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public abstract class AbstractStreamComponent extends AbstractInvokable { public final class StreamComponentHelper<T extends AbstractInvokable> {
private final Log log = LogFactory.getLog(AbstractStreamComponent.class); private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
private static int numComponents = 0;
private TupleTypeInfo<Tuple> inTupleTypeInfo = null; private TupleTypeInfo<Tuple> inTupleTypeInfo = null;
private TupleSerializer<Tuple> inTupleSerializer = null; private TupleSerializer<Tuple> inTupleSerializer = null;
...@@ -62,6 +77,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -62,6 +77,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
private TupleSerializer<Tuple> outTupleSerializer = null; private TupleSerializer<Tuple> outTupleSerializer = null;
private SerializationDelegate<Tuple> outSerializationDelegate = null; private SerializationDelegate<Tuple> outSerializationDelegate = null;
public Collector<Tuple> collector;
private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>(); private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>();
private List<Integer> batchSizesPartitioned = new ArrayList<Integer>(); private List<Integer> batchSizesPartitioned = new ArrayList<Integer>();
private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>(); private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>();
...@@ -70,34 +86,49 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -70,34 +86,49 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>(); private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>(); private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
protected Configuration configuration; public static int newComponent() {
protected Collector<Tuple> collector;
protected int instanceID;
protected String name;
private static int numComponents = 0;
protected static int newComponent() {
numComponents++; numComponents++;
return numComponents; return numComponents;
} }
protected void initialize() { public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
configuration = getTaskConfiguration(); List<RecordWriter<StreamRecord>> outputs) {
name = configuration.getString("componentName", "MISSING_COMPONENT_NAME");
EventListener[] ackListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
ackListeners[i] = new AckEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(ackListeners[i], AckEvent.class);
}
} }
protected Collector<Tuple> setCollector(List<RecordWriter<StreamRecord>> outputs) { public void setFailListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
long batchTimeout = configuration.getLong("batchTimeout", 1000); List<RecordWriter<StreamRecord>> outputs) {
EventListener[] failListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
failListeners[i] = new FailEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(failListeners[i], FailEvent.class);
}
}
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned, collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned,
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, id,
instanceID, outSerializationDelegate, outputsPartitioned, outputsNotPartitioned); outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
return collector; return collector;
} }
protected void setSerializers() { public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = configuration.getBytes("operator", null); byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = configuration.getString("operatorName", ""); String operatorName = taskConfiguration.getString("operatorName", "");
Object function = null; Object function = null;
try { try {
...@@ -154,7 +185,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -154,7 +185,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer); outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
} }
protected void setSinkSerializer() { public void setSinkSerializer() {
if (outSerializationDelegate != null) { if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo; inTupleTypeInfo = outTupleTypeInfo;
...@@ -163,40 +194,59 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -163,40 +194,59 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
} }
} }
protected AbstractRecordReader getConfigInputs() public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException { throws StreamComponentException {
int numberOfInputs = configuration.getInteger("numberOfInputs", 0); int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
if (numberOfInputs < 2) { if (numberOfInputs < 2) {
if (taskBase instanceof StreamTask) {
return new StreamRecordReader(this, ArrayStreamRecord.class, return new StreamRecordReader((StreamTask) taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer); inDeserializationDelegate, inTupleSerializer);
} else if (taskBase instanceof StreamSink) {
return new StreamRecordReader((StreamSink) taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
} else { } else {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs]; MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs];
for (int i = 0; i < numberOfInputs; i++) { for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<StreamRecord>(this); if (taskBase instanceof StreamTask) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamTask) taskBase);
} else if (taskBase instanceof StreamSink) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamSink) taskBase);
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigInputs");
}
} }
return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class, return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer); inDeserializationDelegate, inTupleSerializer);
} }
} }
protected void setConfigOutputs(List<RecordWriter<StreamRecord>> outputs, public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException { List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = configuration.getInteger("numberOfOutputs", 0); int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) { for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(i, partitioners); setPartitioner(taskConfiguration, i, partitioners);
ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i); ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
outputs.add(new RecordWriter<StreamRecord>(this, outputPartitioner)); if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputsPartitioned.size() < batchSizesPartitioned.size()) { if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i)); outputsPartitioned.add(outputs.get(i));
} else { } else {
...@@ -205,22 +255,92 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -205,22 +255,92 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
} }
} }
private void setPartitioner(int numberOfOutputs, /**
List<ChannelSelector<StreamRecord>> partitioners) { * Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param config
* Configuration object
* @return The StreamComponent object
*/
private StreamComponent getInvokable(Class<? extends StreamComponent> userFunctionClass,
Configuration config) {
StreamComponent userFunction = null;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamComponent) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
@SuppressWarnings("rawtypes")
public UserSinkInvokable getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings("rawtypes")
public UserTaskInvokable getTaskInvokable(Configuration config) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = configuration.getClass( // Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable) getInvokable(userFunctionClass, config);
}
@SuppressWarnings("rawtypes")
public UserSourceInvokable getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable) getInvokable(userFunctionClass, config);
}
// TODO find a better solution for this
public void threadSafePublish(AbstractTaskEvent event, AbstractRecordReader inputs)
throws InterruptedException, IOException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
inputs.publishEvent(event);
concurrentModificationOccured = true;
} catch (ConcurrentModificationException exeption) {
if (log.isTraceEnabled()) {
log.trace("Waiting to publish " + event.getClass());
}
}
}
}
private void setPartitioner(Configuration config, int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class, "partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class); ChannelSelector.class);
Integer batchSize = configuration.getInteger("batchSize_" + numberOfOutputs, 1); Integer batchSize = config.getInteger("batchSize_" + numberOfOutputs, 1);
try { try {
if (partitioner.equals(FieldsPartitioner.class)) { if (partitioner.equals(FieldsPartitioner.class)) {
batchSizesPartitioned.add(batchSize); batchSizesPartitioned.add(batchSize);
numOfOutputsPartitioned.add(configuration numOfOutputsPartitioned.add(config
.getInteger("numOfOutputs_" + numberOfOutputs, -1)); .getInteger("numOfOutputs_" + numberOfOutputs, -1));
// TODO:force one partitioning field // TODO:force one partitioning field
keyPosition = configuration.getInteger("partitionerIntParam_" + numberOfOutputs, 1); keyPosition = config.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition)); partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
...@@ -240,8 +360,8 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -240,8 +360,8 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
} }
} }
protected void invokeRecords(StreamRecordInvokable<Tuple, Tuple> userFunction, public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
AbstractRecordReader inputs) throws Exception { throws Exception {
if (inputs instanceof UnionStreamRecordReader) { if (inputs instanceof UnionStreamRecordReader) {
UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs; UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs;
while (recordReader.hasNext()) { while (recordReader.hasNext()) {
...@@ -258,75 +378,5 @@ public abstract class AbstractStreamComponent extends AbstractInvokable { ...@@ -258,75 +378,5 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
} }
} }
} }
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @return The StreamComponent object
*/
protected StreamComponentInvokable getInvokable(Class<? extends StreamComponentInvokable> userFunctionClass) {
StreamComponentInvokable userFunction = null;
byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
try { }
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( \ No newline at end of file
userFunctionSerialized));
userFunction = (StreamComponentInvokable) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
protected abstract void setInvokable();
// protected void threadSafePublish(AbstractTaskEvent event,
// AbstractRecordReader inputs)
// throws InterruptedException, IOException {
//
// boolean concurrentModificationOccured = false;
// while (!concurrentModificationOccured) {
// try {
// inputs.publishEvent(event);
// concurrentModificationOccured = true;
// } catch (ConcurrentModificationException exeption) {
// if (log.isTraceEnabled()) {
// log.trace("Waiting to publish " + event.getClass());
// }
// }
// }
// }
//
// protected void setAckListener(FaultToleranceUtil recordBuffer, int
// sourceInstanceID,
// List<RecordWriter<StreamRecord>> outputs) {
//
// EventListener[] ackListeners = new EventListener[outputs.size()];
//
// for (int i = 0; i < outputs.size(); i++) {
// ackListeners[i] = new AckEventListener(sourceInstanceID, recordBuffer,
// i);
// outputs.get(i).subscribeToEvent(ackListeners[i], AckEvent.class);
// }
//
// }
//
// protected void setFailListener(FaultToleranceUtil recordBuffer, int
// sourceInstanceID,
// List<RecordWriter<StreamRecord>> outputs) {
//
// EventListener[] failListeners = new EventListener[outputs.size()];
//
// for (int i = 0; i < outputs.size(); i++) {
// failListeners[i] = new FailEventListener(sourceInstanceID, recordBuffer,
// i);
// outputs.get(i).subscribeToEvent(failListeners[i], FailEvent.class);
// }
// }
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent<OUT extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
private List<RecordWriter<StreamRecord>> outputs;
protected int channelID;
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void setPerfCounterDir(String dir) {
performanceCounter.setFname(dir + "/" + name + channelID);
}
public final void emit(StreamRecord record) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
if (log.isInfoEnabled()) {
log.info("EMITTED: " + record.getId() + " -- " + name);
}
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
if (log.isWarnEnabled()) {
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
}
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
}
}
}
public String getResult() {
return "Override getResult() to pass your own results";
}
}
/*********************************************************************************************************************** /***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) *
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
...@@ -9,6 +10,7 @@ ...@@ -9,6 +10,7 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent; package eu.stratosphere.streaming.api.streamcomponent;
...@@ -17,11 +19,13 @@ import java.io.IOException; ...@@ -17,11 +19,13 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/** /**
...@@ -35,6 +39,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -35,6 +39,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
private final Class<? extends StreamRecord> recordType; private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<Tuple> deserializationDelegate; private DeserializationDelegate<Tuple> deserializationDelegate;
private TupleSerializer<Tuple> tupleSerializer; private TupleSerializer<Tuple> tupleSerializer;
/** /**
* Stores the last read record. * Stores the last read record.
*/ */
...@@ -57,14 +62,41 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -57,14 +62,41 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
* @param recordType * @param recordType
* The class of records that can be read from the record reader. * The class of records that can be read from the record reader.
* @param deserializationDelegate * @param deserializationDelegate
* deserializationDelegate * Deserialization delegate
* @param tupleSerializer
* Tuple serializer
*/
public StreamRecordReader(AbstractTask taskBase, Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord>
// get(), 0);
super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param outputBase
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
* @param deserializationDelegate
* Deserialization delegate
* @param tupleSerializer * @param tupleSerializer
* tupleSerializer * Tuple serializer
*/ */
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType, public StreamRecordReader(AbstractOutputTask outputBase,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate, DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) { TupleSerializer<Tuple> tupleSerializer) {
super(taskBase); // super(outputBase, MutableRecordDeserializerFactory.<StreamRecord>
// get(), 0);
super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType; this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate; this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer; this.tupleSerializer = tupleSerializer;
...@@ -101,11 +133,10 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -101,11 +133,10 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
return true; return true;
case END_OF_SUPERSTEP: case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck()) { if (incrementEndOfSuperstepEventAndCheck())
return false; return false;
} else { else
break; // fall through and wait for next record/event break; // fall through and wait for next record/event
}
case TASK_EVENT: case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent()); handleEvent(this.inputGate.getCurrentEvent());
......
...@@ -18,51 +18,46 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -18,51 +18,46 @@ package eu.stratosphere.streaming.api.streamcomponent;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.runtime.io.api.AbstractRecordReader; import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractStreamComponent { public class StreamSink extends AbstractOutputTask {
private static final Log log = LogFactory.getLog(StreamSink.class); private static final Log log = LogFactory.getLog(StreamSink.class);
private AbstractRecordReader inputs; private AbstractRecordReader inputs;
private StreamRecordInvokable<Tuple, Tuple> userFunction; private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
public StreamSink() { public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here // TODO: Make configuration file visible and call setClassInputs() here
userFunction = null; userFunction = null;
streamSinkHelper = new StreamComponentHelper<StreamSink>();
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
initialize(); Configuration taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try { try {
setSerializers(); streamSinkHelper.setSerializers(taskConfiguration);
setSinkSerializer(); streamSinkHelper.setSinkSerializer();
inputs = getConfigInputs(); inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
} catch (Exception e) { } catch (Exception e) {
if (log.isErrorEnabled()) { if (log.isErrorEnabled()) {
log.error("Cannot register inputs", e); log.error("Cannot register inputs", e);
} }
} }
// FaultToleranceType faultToleranceType = FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
// FaultToleranceType.from(taskConfiguration .getInteger("faultToleranceType", 0));
// .getInteger("faultToleranceType", 0));
setInvokable();
}
@SuppressWarnings({ "rawtypes", "unchecked" }) userFunction = streamSinkHelper.getSinkInvokable(taskConfiguration);
@Override
protected void setInvokable() {
Class<? extends UserSinkInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
userFunction = (UserSinkInvokable<Tuple>) getInvokable(userFunctionClass);
} }
@Override @Override
...@@ -70,12 +65,9 @@ public class StreamSink extends AbstractStreamComponent { ...@@ -70,12 +65,9 @@ public class StreamSink extends AbstractStreamComponent {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoked"); log.debug("SINK " + name + " invoked");
} }
streamSinkHelper.invokeRecords(userFunction, inputs);
invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoke finished"); log.debug("SINK " + name + " invoke finished");
} }
} }
} }
...@@ -21,85 +21,82 @@ import java.util.List; ...@@ -21,85 +21,82 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.runtime.io.api.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractStreamComponent { public class StreamSource extends AbstractInputTask<DummyIS> {
private static final Log log = LogFactory.getLog(StreamSource.class); private static final Log log = LogFactory.getLog(StreamSource.class);
private List<RecordWriter<StreamRecord>> outputs; private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners; private List<ChannelSelector<StreamRecord>> partitioners;
private UserSourceInvokable<Tuple> userFunction; private UserSourceInvokable userFunction;
private static int numSources; private static int numSources;
private int[] numberOfOutputChannels; private int sourceInstanceID;
// private FaultToleranceUtil recordBuffer; private String name;
// private FaultToleranceType faultToleranceType; private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() { public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<StreamRecord>>(); outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>(); partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null; userFunction = null;
numSources = newComponent(); streamSourceHelper = new StreamComponentHelper<StreamSource>();
instanceID = numSources; numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
}
@Override
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<DummyIS> getInputSplitType() {
return null;
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
initialize(); Configuration taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try { try {
setSerializers(); streamSourceHelper.setSerializers(taskConfiguration);
setConfigOutputs(outputs, partitioners); streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
setCollector(outputs); streamSourceHelper.setCollector(taskConfiguration, sourceInstanceID, outputs);
} catch (StreamComponentException e) { } catch (StreamComponentException e) {
if (log.isErrorEnabled()) { if (log.isErrorEnabled()) {
log.error("Cannot register outputs", e); log.error("Cannot register outputs", e);
} }
} }
numberOfOutputChannels = new int[outputs.size()]; int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) { for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0); numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
} }
setInvokable(); userFunction = (UserSourceInvokable) streamSourceHelper
// streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, .getSourceInvokable(taskConfiguration);
// outputs); streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
// streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
// outputs);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
userFunction = (UserSourceInvokable<Tuple>) getInvokable(userFunctionClass);
} }
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoked with instance id " + instanceID); log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
}
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
userFunction.invoke(collector);
if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);
} }
userFunction.invoke(streamSourceHelper.collector);
} }
} }
...@@ -21,88 +21,79 @@ import java.util.List; ...@@ -21,88 +21,79 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.runtime.io.api.AbstractRecordReader; import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractStreamComponent { public class StreamTask extends AbstractTask {
private static final Log log = LogFactory.getLog(StreamTask.class); private static final Log log = LogFactory.getLog(StreamTask.class);
private AbstractRecordReader inputs; private AbstractRecordReader inputs;
private List<RecordWriter<StreamRecord>> outputs; private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners; private List<ChannelSelector<StreamRecord>> partitioners;
private StreamRecordInvokable<Tuple, Tuple> userFunction; private UserTaskInvokable userFunction;
private int[] numberOfOutputChannels;
private static int numTasks; private static int numTasks;
private int taskInstanceID;
private String name;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer;
public StreamTask() { public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here // TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<StreamRecord>>(); outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>(); partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null; userFunction = null;
numTasks = newComponent(); numTasks = StreamComponentHelper.newComponent();
instanceID = numTasks; taskInstanceID = numTasks;
streamTaskHelper = new StreamComponentHelper<StreamTask>();
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
initialize(); taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try { try {
setSerializers(); streamTaskHelper.setSerializers(taskConfiguration);
inputs = getConfigInputs(); inputs = streamTaskHelper.getConfigInputs(this, taskConfiguration);
setConfigOutputs(outputs, partitioners); streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
setCollector(outputs); streamTaskHelper.setCollector(taskConfiguration, taskInstanceID, outputs);
} catch (StreamComponentException e) { } catch (StreamComponentException e) {
if (log.isErrorEnabled()) { if (log.isErrorEnabled()) {
log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e); log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e);
} }
} }
numberOfOutputChannels = new int[outputs.size()]; int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) { for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0); numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
} }
setInvokable(); userFunction = (UserTaskInvokable) streamTaskHelper.getTaskInvokable(taskConfiguration);
// streamTaskHelper.setAckListener(recordBuffer, taskInstanceID,
// outputs);
// streamTaskHelper.setFailListener(recordBuffer, taskInstanceID,
// outputs);
}
// TODO consider logging stack trace! streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
@SuppressWarnings({ "rawtypes", "unchecked" }) streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
userFunction = (UserTaskInvokable<Tuple, Tuple>) getInvokable(userFunctionClass);
} }
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoked with instance id " + instanceID); log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
} }
streamTaskHelper.invokeRecords(userFunction, inputs);
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoke finished with instance id " + instanceID); log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
} }
} }
} }
\ No newline at end of file
...@@ -19,10 +19,10 @@ import java.io.IOException; ...@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord> public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
...@@ -102,7 +102,6 @@ public class ArrayStreamRecord extends StreamRecord { ...@@ -102,7 +102,6 @@ public class ArrayStreamRecord extends StreamRecord {
* Value to set * Value to set
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , TupleSizeMismatchException * , TupleSizeMismatchException
* @return Returns the StreamRecord object
*/ */
public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException { public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
try { try {
......
...@@ -111,7 +111,6 @@ public class ListStreamRecord extends StreamRecord { ...@@ -111,7 +111,6 @@ public class ListStreamRecord extends StreamRecord {
* Value to set * Value to set
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , TupleSizeMismatchException * , TupleSizeMismatchException
* @return Returns the StreamRecord object
*/ */
public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException { public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
try { try {
......
...@@ -129,7 +129,6 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable { ...@@ -129,7 +129,6 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable {
* Value to set * Value to set
* @throws NoSuchTupleException * @throws NoSuchTupleException
* , TupleSizeMismatchException * , TupleSizeMismatchException
* @return Returns the StreamRecord object
*/ */
public abstract StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException; public abstract StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException;
...@@ -150,7 +149,6 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable { ...@@ -150,7 +149,6 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable {
* type of the tuple * type of the tuple
* @return Copy of the tuple * @return Copy of the tuple
*/ */
@SuppressWarnings("unchecked")
public static <T extends Tuple> T copyTuple(T tuple) { public static <T extends Tuple> T copyTuple(T tuple) {
// TODO: implement deep copy for arrays // TODO: implement deep copy for arrays
int numofFields = tuple.getArity(); int numofFields = tuple.getArity();
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.InputSplit;
public class DummyIS implements InputSplit {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void read(DataInput in) throws IOException {
}
@Override
public int getSplitNumber() {
return 0;
}
}
\ No newline at end of file
...@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.basictopology; ...@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.basictopology;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment; import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class BasicTopology { public class BasicTopology {
...@@ -52,7 +52,7 @@ public class BasicTopology { ...@@ -52,7 +52,7 @@ public class BasicTopology {
private static final int SOURCE_PARALELISM = 1; private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM) DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM)
.map(new BasicMap(), PARALELISM); .map(new BasicMap(), PARALELISM);
......
...@@ -21,8 +21,8 @@ import eu.stratosphere.api.java.functions.FlatMapFunction; ...@@ -21,8 +21,8 @@ import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple4; import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment; import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class CellInfoLocal { public class CellInfoLocal {
...@@ -111,7 +111,7 @@ public class CellInfoLocal { ...@@ -111,7 +111,7 @@ public class CellInfoLocal {
//In this example two different source then connect the two stream and apply a function for the connected stream //In this example two different source then connect the two stream and apply a function for the connected stream
// TODO add arguments // TODO add arguments
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource( DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
new QuerySource(), SOURCE_PARALELISM); new QuerySource(), SOURCE_PARALELISM);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter; package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import eu.stratosphere.api.java.tuple.Tuple4; import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class CollaborativeFilteringSink extends SinkFunction<Tuple4<Integer, Integer, Integer, Long>> { public class CollaborativeFilteringSink extends SinkFunction<Tuple4<Integer, Integer, Integer, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -19,7 +19,7 @@ import java.io.BufferedReader; ...@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple4; import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class CollaborativeFilteringSource extends SourceFunction<Tuple4<Integer, Integer, Integer, Long>> { public class CollaborativeFilteringSource extends SourceFunction<Tuple4<Integer, Integer, Integer, Long>> {
......
...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils; ...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class KMeansLocal { public class KMeansLocal {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused") // @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env // DataStream<Tuple2<String, Integer>> dataStream = env
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.kmeans; package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class KMeansSink extends SinkFunction<Tuple3<Integer, Integer, Long>> { public class KMeansSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.iterative.kmeans; ...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.iterative.kmeans;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class KMeansSource extends SourceFunction<Tuple2<String, Long>> { public class KMeansSource extends SourceFunction<Tuple2<String, Long>> {
......
...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils; ...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class PageRankLocal { public class PageRankLocal {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused") // @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env // DataStream<Tuple2<String, Integer>> dataStream = env
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.pagerank; package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class PageRankSink extends SinkFunction<Tuple3<Integer, Float, Long>> { public class PageRankSink extends SinkFunction<Tuple3<Integer, Float, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -19,7 +19,7 @@ import java.io.BufferedReader; ...@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class PageRankSource extends SourceFunction<Tuple3<Integer, Integer, Long>> { public class PageRankSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
......
...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils; ...@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class SSSPLocal { public class SSSPLocal {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused") // @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env // DataStream<Tuple2<String, Integer>> dataStream = env
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.sssp; package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class SSSPSink extends SinkFunction<Tuple3<Integer, Integer, Long>> { public class SSSPSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -19,7 +19,7 @@ import java.io.BufferedReader; ...@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class SSSPSource extends SourceFunction<Tuple3<Integer, Integer, Long>> { public class SSSPSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
......
...@@ -34,7 +34,7 @@ public class JoinLocal { ...@@ -34,7 +34,7 @@ public class JoinLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new JoinSourceOne(), DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new JoinSourceOne(),
SOURCE_PARALELISM); SOURCE_PARALELISM);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.join; package eu.stratosphere.streaming.examples.join;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class JoinSink extends SinkFunction<Tuple3<String, Integer, Integer>> { public class JoinSink extends SinkFunction<Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join; ...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer>> { public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer>> {
......
...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join; ...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer>> { public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer>> {
......
...@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.ml; ...@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment; import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class IncrementalLearningSkeleton { public class IncrementalLearningSkeleton {
...@@ -124,7 +124,7 @@ public class IncrementalLearningSkeleton { ...@@ -124,7 +124,7 @@ public class IncrementalLearningSkeleton {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> model = DataStream<Tuple1<Integer>> model =
env.addSource(new TrainingDataSource(), SOURCE_PARALELISM) env.addSource(new TrainingDataSource(), SOURCE_PARALELISM)
......
...@@ -24,8 +24,8 @@ import eu.stratosphere.api.java.tuple.Tuple; ...@@ -24,8 +24,8 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment; import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class IncrementalOLS { public class IncrementalOLS {
...@@ -154,7 +154,7 @@ public class IncrementalOLS { ...@@ -154,7 +154,7 @@ public class IncrementalOLS {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Boolean, Double[]>> model = DataStream<Tuple2<Boolean, Double[]>> model =
env.addSource(new TrainingDataSource(), SOURCE_PARALELISM) env.addSource(new TrainingDataSource(), SOURCE_PARALELISM)
......
...@@ -36,7 +36,7 @@ public class WindowJoinLocal { ...@@ -36,7 +36,7 @@ public class WindowJoinLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource( DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM); new WindowJoinSourceOne(), SOURCE_PARALELISM);
......
...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join; ...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4; import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, Integer, Long>> { public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, Integer, Long>> {
......
...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join; ...@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random; import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4; import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, Integer, Long>> { public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, Integer, Long>> {
......
...@@ -27,7 +27,7 @@ public class WindowSumLocal { ...@@ -27,7 +27,7 @@ public class WindowSumLocal {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, Long>> dataStream = env DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM) .addSource(new WindowSumSource(), SOURCE_PARALELISM)
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.sum; package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> { public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.sum; package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> { public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
......
...@@ -27,7 +27,7 @@ public class WindowWordCountLocal { ...@@ -27,7 +27,7 @@ public class WindowWordCountLocal {
// This example will count the occurrence of each word in the input file with a sliding window. // This example will count the occurrence of each word in the input file with a sliding window.
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused") @SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Long>> dataStream = env DataStream<Tuple3<String, Integer, Long>> dataStream = env
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.wordcount; package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.function.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> { public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -19,7 +19,7 @@ import java.io.BufferedReader; ...@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.function.SourceFunction; import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> { public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> {
......
...@@ -23,16 +23,18 @@ import eu.stratosphere.streaming.util.TestDataUtil; ...@@ -23,16 +23,18 @@ import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountLocal { public class WordCountLocal {
// This example will count the occurrence of each word in the input file. // This example will count the occurrence of each word in the input file.
public static void main(String[] args) { public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt"); TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env DataStream<Tuple2<String, Integer>> dataStream = env
.readTextFile("src/test/resources/testdata/hamlet.txt") .readTextFile("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountSplitter()).partitionBy(0).map(new WordCountCounter()); .flatMap(new WordCountSplitter(), 1)
.partitionBy(0)
.map(new WordCountCounter(), 1);
dataStream.print(); dataStream.print();
env.execute(); env.execute();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册