提交 b44650b0 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Removed unused StreamInvokableComponent

上级 086d1a72
...@@ -7,16 +7,40 @@ ...@@ -7,16 +7,40 @@
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<version>0.2-SNAPSHOT</version> <version>0.2-SNAPSHOT</version>
<artifactId>stratosphere-streaming</artifactId> <artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name> <name>stratosphere-streaming</name>
<packaging>pom</packaging>
<url>http://github.com/stratosphere/stratosphere-streaming</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>stratosphere</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/stratosphere/stratosphere-streaming</url>
<connection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</connection>
<developerConnection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</developerConnection>
</scm>
<packaging>jar</packaging> <developers>
</developers>
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
</modules>
<properties> <properties>
<stratosphere.version>0.5</stratosphere.version> <stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties> </properties>
<repositories> <repositories>
<repository> <repository>
<id>dms-repo</id> <id>dms-repo</id>
...@@ -25,47 +49,54 @@ ...@@ -25,47 +49,54 @@
<snapshots><enabled>true</enabled></snapshots> <snapshots><enabled>true</enabled></snapshots>
</repository> </repository>
</repositories> </repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId> <artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId> <artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId> <artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId> <artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId> <artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId> <artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.7</version> <version>4.7</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.3.2</version> <version>3.3.2</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
...@@ -78,27 +109,14 @@ ...@@ -78,27 +109,14 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>1.2.16</version> <version>1.2.16</version>
</dependency> <type>jar</type>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<version>1.8.5</version> <version>1.8.5</version>
<scope>test</scope> <scope>test</scope>
<type>jar</type>
</dependency> </dependency>
</dependencies> </dependencies>
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent<OUT extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
private List<RecordWriter<StreamRecord>> outputs;
protected int channelID;
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void setPerfCounterDir(String dir) {
performanceCounter.setFname(dir + "/" + name + channelID);
}
public final void emit(StreamRecord record) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
if (log.isInfoEnabled()) {
log.info("EMITTED: " + record.getId() + " -- " + name);
}
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
if (log.isWarnEnabled()) {
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
}
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
}
}
}
public String getResult() {
return "Override getResult() to pass your own results";
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.LinkedList;
public class StateManager implements Runnable, Serializable {
private static final long serialVersionUID = 1L;
private LinkedList<Object> stateList = new LinkedList<Object>();
private long checkpointInterval;
private String filename;
public StateManager(String filename, long checkpointIntervalMS) {
this.filename = filename;
this.checkpointInterval = checkpointIntervalMS;
}
public void registerState(Object state) {
stateList.add(state);
}
public void restoreState(){
ObjectInputStream ois = null;
try {
ois=new ObjectInputStream(new FileInputStream(filename));
} catch (Exception e) {
e.printStackTrace();
}
for (Object state : stateList){
try {
state= ois.readObject();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//run checkpoint.
@Override
public void run() {
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new FileOutputStream(filename));
} catch (Exception e) {
e.printStackTrace();
}
// take snapshot of every registered state.
while (true) {
try {
Thread.sleep(checkpointInterval);
for (Object state : stateList) {
oos.writeObject(state);
oos.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
ClusterProvider#clusterId=local1TM
LocalClusterProvider#numTaskTrackers=1
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=local_fs
\ No newline at end of file
ClusterProvider#clusterId=local4TM
LocalClusterProvider#numTaskTrackers=4
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=mini_hdfs
\ No newline at end of file
CoGroupITCase=local1TM,local4TM
CrossITCase=local1TM
MapITCase=local1TM,local4TM
MatchITCase=local1TM
ReduceITCase=local1TM
\ No newline at end of file
TPCHQuery3ITCase=local1TM,local4TM
TPCHQuery4ITCase=local1TM,local4TM
TPCHQuery9ITCase=local1TM,local4TM
TPCHQuery10ITCase=local1TM,local4TM
GlobalSortingITCase=local1TM,local4TM
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-addons</artifactId>
<name>stratosphere-streaming-addons</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.kafka; package eu.stratosphere.streaming.addons.kafka;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
...@@ -22,18 +22,8 @@ import java.util.Properties; ...@@ -22,18 +22,8 @@ import java.util.Properties;
import kafka.producer.KeyedMessage; import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; import kafka.producer.ProducerConfig;
/**
/* * Producer for Kafka jobs.
* This is a simple kafka producer that reads local file from disk and produces line streams.
* To use the producer, a zookeeper server and a kafka server should be on service.
* Run the following script to start a zookeeper server:
* bin/zookeeper-server-start.sh config/zookeeper.properties
* Run the following script to start a kafka server:
* bin/kafka-server-start.sh config/server.properties
* Run the following script to start the producer:
* java -cp kafka-0.8/libs/*:yourJarFile.jar eu.stratosphere.streaming.kafka.KafkaProducer yourTopicID kafkaServerIp
* As an example:
* java -cp kafka-0.8/libs/*:stratosphere-streaming.jar eu.stratosphere.streaming.kafka.KafkaProducer test localhost:9092
*/ */
public class KafkaProducer { public class KafkaProducer {
static kafka.javaapi.producer.Producer<Integer, String> producer; static kafka.javaapi.producer.Producer<Integer, String> producer;
...@@ -48,7 +38,7 @@ public class KafkaProducer { ...@@ -48,7 +38,7 @@ public class KafkaProducer {
} }
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
if (args.length == 3) { if (args.length >= 1) {
String infilename=args[0]; String infilename=args[0];
String topicId=args[1]; String topicId=args[1];
String brokerAddr=args[2]; String brokerAddr=args[2];
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.kafka; package eu.stratosphere.streaming.addons.kafka;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.kafka; package eu.stratosphere.streaming.addons.kafka;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
...@@ -24,7 +24,7 @@ public class KafkaTopology { ...@@ -24,7 +24,7 @@ public class KafkaTopology {
private static final int SOURCE_PARALELISM = 1; private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(); StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM) DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print(); .print();
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.performance; package eu.stratosphere.streaming.addons.performance;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
...@@ -33,7 +33,7 @@ public class WordCountPerformanceLocal { ...@@ -33,7 +33,7 @@ public class WordCountPerformanceLocal {
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment().setClusterSize(2); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("/home/strato/stratosphere-distrib/resources/hamlet.txt", 4) .readTextStream("/home/strato/stratosphere-distrib/resources/hamlet.txt", 4)
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.performance; package eu.stratosphere.streaming.addons.performance;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq; package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException; import java.io.IOException;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq; package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException; import java.io.IOException;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api; package eu.stratosphere.streaming.addons.rabbitmq;
import static org.junit.Assert.*; import static org.junit.Assert.*;
...@@ -25,8 +25,7 @@ import org.junit.Test; ...@@ -25,8 +25,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.rabbitmq.RMQSink; import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.rabbitmq.RMQSource;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq; package eu.stratosphere.streaming.addons.rabbitmq;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream; import eu.stratosphere.streaming.api.DataStream;
...@@ -24,7 +24,7 @@ public class RMQTopology { ...@@ -24,7 +24,7 @@ public class RMQTopology {
private static final int SOURCE_PARALELISM = 1; private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) { public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(); StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"), DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"),
SOURCE_PARALELISM).print(); SOURCE_PARALELISM).print();
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-core</artifactId>
<name>stratosphere-streaming-core</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
...@@ -213,6 +213,10 @@ public class DataStream<T extends Tuple> { ...@@ -213,6 +213,10 @@ public class DataStream<T extends Tuple> {
new FlatMapInvokable<T, R>(flatMapper), parallelism); new FlatMapInvokable<T, R>(flatMapper), parallelism);
} }
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return flatMap(flatMapper, 1);
}
/** /**
* Applies a Map transformation on a DataStream. The transformation calls a * Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call * MapFunction for each element of the DataStream. Each MapFunction call
...@@ -232,6 +236,10 @@ public class DataStream<T extends Tuple> { ...@@ -232,6 +236,10 @@ public class DataStream<T extends Tuple> {
parallelism); parallelism);
} }
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return map(mapper, 1);
}
/** /**
* Applies a reduce transformation on preset chunks of the DataStream. The * Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a GroupReduceFunction for each tuple batch of the * transformation calls a GroupReduceFunction for each tuple batch of the
...@@ -255,6 +263,11 @@ public class DataStream<T extends Tuple> { ...@@ -255,6 +263,11 @@ public class DataStream<T extends Tuple> {
new BatchReduceInvokable<T, R>(reducer), parallelism); new BatchReduceInvokable<T, R>(reducer), parallelism);
} }
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
return batchReduce(reducer, batchSize, 1);
}
/** /**
* Applies a Filter transformation on a DataStream. The transformation calls * Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only * a FilterFunction for each element of the DataStream and retains only
...@@ -273,6 +286,10 @@ public class DataStream<T extends Tuple> { ...@@ -273,6 +286,10 @@ public class DataStream<T extends Tuple> {
new FilterInvokable<T>(filter), parallelism); new FilterInvokable<T>(filter), parallelism);
} }
public DataStream<T> filter(FilterFunction<T> filter) {
return filter(filter, 1);
}
/** /**
* Sets the given sink function. * Sets the given sink function.
* *
...@@ -297,34 +314,6 @@ public class DataStream<T extends Tuple> { ...@@ -297,34 +314,6 @@ public class DataStream<T extends Tuple> {
return environment.addSink(this.copy(), sinkFunction); return environment.addSink(this.copy(), sinkFunction);
} }
/**
* Sets the given sink function with stream id.
*
* @param streamId
* The stream id that identifies the stream.
* @param sinkFunction
* The object containing the sink's invoke function.
* @param parallelism
* The number of threads the function runs on.
* @return The modified datastream.
*/
public DataStream<T> addSink(String streamId, SinkFunction<T> sinkFunction, int parallelism) {
return environment.addSink(streamId, this.copy(), sinkFunction, parallelism);
}
/**
* Sets the given sink function with stream id.
*
* @param streamId
* The stream id that identifies the stream.
* @param sinkFunction
* The object containing the sink's invoke function.
* @return The closed datastream.
*/
public DataStream<T> addSink(String streamId, SinkFunction<T> sinkFunction) {
return environment.addSink(streamId, this.copy(), sinkFunction);
}
/** /**
* Prints the tuples from the DataStream. * Prints the tuples from the DataStream.
* *
...@@ -334,29 +323,6 @@ public class DataStream<T extends Tuple> { ...@@ -334,29 +323,6 @@ public class DataStream<T extends Tuple> {
return environment.print(this.copy()); return environment.print(this.copy());
} }
/**
* Dump the tuples from the DataStream to disk.
* @param filename
* the name of the output file
* @param streamId
* The stream id that identifies the stream.
* @return The closed datastream.
*/
public DataStream<T> dumpDisk(String filename) {
return environment.dumpDisk(this.copy(), filename);
}
/**
* Dump the tuples from the DataStream to disk.
*
* @param streamId
* The stream id that identifies the stream.
* @return The closed datastream.
*/
public DataStream<T> dumpDisk(String streamId, String filename) {
return environment.dumpDisk(this.copy(), filename);
}
/** /**
* Set the type parameter. * Set the type parameter.
* *
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.util.Arrays;
import java.util.Collection;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
...@@ -28,8 +28,6 @@ import org.apache.commons.logging.LogFactory; ...@@ -28,8 +28,6 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex; import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
...@@ -37,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex; ...@@ -37,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig; import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
...@@ -63,8 +63,8 @@ public class JobGraphBuilder { ...@@ -63,8 +63,8 @@ public class JobGraphBuilder {
protected String maxParallelismVertexName; protected String maxParallelismVertexName;
protected int maxParallelism; protected int maxParallelism;
protected FaultToleranceType faultToleranceType; protected FaultToleranceType faultToleranceType;
private int batchSize; private int batchSize = 1;
private long batchTimeout; private long batchTimeout = 1000;
/** /**
* Creates a new JobGraph with the given name * Creates a new JobGraph with the given name
...@@ -87,24 +87,12 @@ public class JobGraphBuilder { ...@@ -87,24 +87,12 @@ public class JobGraphBuilder {
this.faultToleranceType = faultToleranceType; this.faultToleranceType = faultToleranceType;
} }
/** public void setDefaultBatchSize(int batchSize) {
* Creates a new JobGraph with the given parameters this.batchSize = batchSize;
* }
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
* defaultBatchTimeoutMillis
*/
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, public void setBatchTimeout(int timeout) {
int defaultBatchSize, long defaultBatchTimeoutMillis) { this.batchTimeout = timeout;
this(jobGraphName, faultToleranceType);
this.batchSize = defaultBatchSize;
this.batchTimeout = defaultBatchTimeoutMillis;
} }
/** /**
...@@ -124,14 +112,14 @@ public class JobGraphBuilder { ...@@ -124,14 +112,14 @@ public class JobGraphBuilder {
* Number of parallel instances on one task manager * Number of parallel instances on one task manager
*/ */
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject, public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { String operatorName, byte[] serializedFunction, int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class); source.setInvokableClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction, setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance); parallelism);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName); log.debug("SOURCE: " + sourceName);
...@@ -156,12 +144,12 @@ public class JobGraphBuilder { ...@@ -156,12 +144,12 @@ public class JobGraphBuilder {
*/ */
public void setTask(String taskName, public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject, UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { String operatorName, byte[] serializedFunction, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class); task.setInvokableClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction, setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance); parallelism);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName); log.debug("TASK: " + taskName);
...@@ -185,12 +173,11 @@ public class JobGraphBuilder { ...@@ -185,12 +173,11 @@ public class JobGraphBuilder {
* Number of parallel instances on one task manager * Number of parallel instances on one task manager
*/ */
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject, public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { String operatorName, byte[] serializedFunction, int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class); sink.setInvokableClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, parallelism);
parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName); log.debug("SINK: " + sinkName);
...@@ -218,10 +205,11 @@ public class JobGraphBuilder { ...@@ -218,10 +205,11 @@ public class JobGraphBuilder {
*/ */
private void setComponent(String componentName, AbstractJobVertex component, private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction, Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism, int subtasksPerInstance) { int parallelism) {
component.setNumberOfSubtasks(parallelism); component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance); // TODO remove all NumberOfSubtasks setting
// component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) { if (parallelism > maxParallelism) {
maxParallelism = parallelism; maxParallelism = parallelism;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*
* @param parallelism
* Number of parallel cores utilized.
*/
@Override
public void execute() {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
/**
* Source Function used to generate the number sequence
*
*/
public class SequenceSource extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
package eu.stratosphere.streaming.api; package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord; import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -19,8 +19,8 @@ import java.util.ArrayList; ...@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
......
...@@ -15,10 +15,7 @@ ...@@ -15,10 +15,7 @@
package eu.stratosphere.streaming.api; package eu.stratosphere.streaming.api;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.util.Arrays; import java.util.Arrays;
...@@ -38,56 +35,62 @@ import eu.stratosphere.util.Collector; ...@@ -38,56 +35,62 @@ import eu.stratosphere.util.Collector;
* construct streaming topologies. * construct streaming topologies.
* *
*/ */
public class StreamExecutionEnvironment { public abstract class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder; protected JobGraphBuilder jobGraphBuilder;
private float clusterSize = 1; private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = -1;
/** /**
* General constructor specifying the batch size in which the tuples are * Constructor for creating StreamExecutionEnvironment
* transmitted and their timeout boundary.
*
* @param defaultBatchSize
* number of tuples in a batch
* @param defaultBatchTimeoutMillis
* timeout boundary in milliseconds
*/ */
public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) { protected StreamExecutionEnvironment() {
if (defaultBatchSize < 1) { jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE);
}
public void setDefaultBatchSize(int batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive."); throw new IllegalArgumentException("Batch size must be positive.");
} else {
jobGraphBuilder.setDefaultBatchSize(batchSize);
} }
if (defaultBatchTimeoutMillis < 1) { }
public void setBatchTimeout(int timeout) {
if (timeout < 1) {
throw new IllegalArgumentException("Batch timeout must be positive."); throw new IllegalArgumentException("Batch timeout must be positive.");
} else {
jobGraphBuilder.setBatchTimeout(timeout);
} }
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE,
defaultBatchSize, defaultBatchTimeoutMillis);
} }
/** /**
* Constructor for transmitting tuples individually with a 1 second timeout. * Partitioning strategy on the stream.
*/ */
public StreamExecutionEnvironment() { public static enum ConnectionType {
this(1, 1000); SHUFFLE, BROADCAST, FIELD
} }
/** public int getDegreeOfParallelism() {
* Set the number of machines in the executing cluster. Used for setting return degreeOfParallelism;
* task parallelism.
*
* @param clusterSize
* cluster size
* @return environment
*/
public StreamExecutionEnvironment setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
return this;
} }
/** public void setDegreeOfParallelism(int degreeOfParallelism) {
* Partitioning strategy on the stream. if (degreeOfParallelism < 1)
*/ throw new IllegalArgumentException("Degree of parallelism must be at least one.");
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD this.degreeOfParallelism = degreeOfParallelism;
}
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalDop);
}
public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
LocalStreamEnvironment lee = new LocalStreamEnvironment();
lee.setDegreeOfParallelism(degreeOfParallelism);
return lee;
} }
/** /**
...@@ -169,8 +172,7 @@ public class StreamExecutionEnvironment { ...@@ -169,8 +172,7 @@ public class StreamExecutionEnvironment {
DataStream<R> returnStream = new DataStream<R>(this, functionName); DataStream<R> returnStream = new DataStream<R>(this, functionName);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName, jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), parallelism, serializeToByteArray(function), parallelism);
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId()); connectGraph(inputStream, returnStream.getId());
...@@ -195,42 +197,13 @@ public class StreamExecutionEnvironment { ...@@ -195,42 +197,13 @@ public class StreamExecutionEnvironment {
DataStream<T> returnStream = new DataStream<T>(this, "sink"); DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink", jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism, serializeToByteArray(sinkFunction), parallelism);
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId()); connectGraph(inputStream, returnStream.getId());
return returnStream; return returnStream;
} }
/**
* Ads a sink to the data stream closing it.
*
* @param streamId
* the stream id that identifies the stream.
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(String streamId, DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
/** /**
* Creates a new DataStream that contains a sequence of numbers. * Creates a new DataStream that contains a sequence of numbers.
* *
...@@ -244,33 +217,6 @@ public class StreamExecutionEnvironment { ...@@ -244,33 +217,6 @@ public class StreamExecutionEnvironment {
return addSource(new SequenceSource(from, to), 1); return addSource(new SequenceSource(from, to), 1);
} }
/**
* Source Function used to generate the number sequence
*
*/
private static final class SequenceSource extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
/** /**
* Creates a new DataStream that contains the given elements. The elements * Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer. * must all be of the same type, for example, all of the String or Integer.
...@@ -288,7 +234,7 @@ public class StreamExecutionEnvironment { ...@@ -288,7 +234,7 @@ public class StreamExecutionEnvironment {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements"); DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data), jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1, 1); "elements", serializeToByteArray(data[0]), 1);
return returnStream.copy(); return returnStream.copy();
} }
...@@ -309,42 +255,11 @@ public class StreamExecutionEnvironment { ...@@ -309,42 +255,11 @@ public class StreamExecutionEnvironment {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements"); DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data), jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1, 1); "elements", serializeToByteArray(data.toArray()[0]), 1);
return returnStream.copy(); return returnStream.copy();
} }
/**
* SourceFunction created to use with fromElements and fromCollection
*
* @param <T>
* type of the returned stream
*/
private static class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
/** /**
* Ads a sink to the data stream closing it. To parallelism is defaulted to * Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1. * 1.
...@@ -362,24 +277,6 @@ public class StreamExecutionEnvironment { ...@@ -362,24 +277,6 @@ public class StreamExecutionEnvironment {
return addSink(inputStream, sinkFunction, 1); return addSink(inputStream, sinkFunction, 1);
} }
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
* @param streamId
* the stream id that identifies the stream
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(String streamId, DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
}
/** /**
* Dummy implementation of the SinkFunction writing every tuple to the * Dummy implementation of the SinkFunction writing every tuple to the
* standard output. Used for print. * standard output. Used for print.
...@@ -396,37 +293,6 @@ public class StreamExecutionEnvironment { ...@@ -396,37 +293,6 @@ public class StreamExecutionEnvironment {
} }
} }
/**
* Disk implementation of the SinkFunction writing every tuple to the
* local disk.
*
* @param <IN>
* Input tuple type
*/
private static final class DiskSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private String filename;
private BufferedWriter writer = null;
public DiskSink(String filename){
this.filename = filename;
}
@Override
public void invoke(IN tuple) {
try {
if (writer == null) {
writer = new BufferedWriter(new FileWriter(filename));
}
writer.write(tuple + "\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/** /**
* Prints the tuples of the data stream to the standard output. * Prints the tuples of the data stream to the standard output.
...@@ -447,31 +313,9 @@ public class StreamExecutionEnvironment { ...@@ -447,31 +313,9 @@ public class StreamExecutionEnvironment {
} }
/** /**
* Dump the tuples of the data stream to the local disk. * Executes the JobGraph.
* **/
* @param inputStream public abstract void execute();
* the input data stream
* @param filename
* the name of the output file
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> dumpDisk(DataStream<T> inputStream, String filename){
DataStream<T> returnStream = addSink(inputStream, new DiskSink<T>(filename));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
// TODO: Link to JobGraph and ClusterUtil
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*/
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
public void executeCluster() { public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123); ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
...@@ -494,8 +338,7 @@ public class StreamExecutionEnvironment { ...@@ -494,8 +338,7 @@ public class StreamExecutionEnvironment {
DataStream<T> returnStream = new DataStream<T>(this, "source"); DataStream<T> returnStream = new DataStream<T>(this, "source");
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source", jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism, serializeToByteArray(sourceFunction), parallelism);
(int) Math.ceil(parallelism / clusterSize));
return returnStream.copy(); return returnStream.copy();
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.examples; package eu.stratosphere.streaming.api.streamcomponent;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
......
...@@ -37,13 +37,13 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; ...@@ -37,13 +37,13 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent; import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction; import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager; import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
...@@ -65,7 +65,7 @@ import eu.stratosphere.streaming.partitioner.DefaultPartitioner; ...@@ -65,7 +65,7 @@ import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public final class StreamComponentHelper<T extends AbstractInvokable> { public final class StreamComponentHelper {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class); private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
private static int numComponents = 0; private static int numComponents = 0;
...@@ -194,41 +194,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -194,41 +194,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} }
} }
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration) public AbstractRecordReader getConfigInputs(AbstractInvokable taskBase,
throws StreamComponentException { Configuration taskConfiguration) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
if (numberOfInputs < 2) { if (numberOfInputs < 2) {
if (taskBase instanceof StreamTask) {
return new StreamRecordReader((StreamTask) taskBase, ArrayStreamRecord.class, return new StreamRecordReader(taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer); inDeserializationDelegate, inTupleSerializer);
} else if (taskBase instanceof StreamSink) {
return new StreamRecordReader((StreamSink) taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
} else { } else {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs]; MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs];
for (int i = 0; i < numberOfInputs; i++) { for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) { recordReaders[i] = new MutableRecordReader<StreamRecord>(taskBase);
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamTask) taskBase);
} else if (taskBase instanceof StreamSink) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamSink) taskBase);
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigInputs");
}
} }
return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class, return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer); inDeserializationDelegate, inTupleSerializer);
} }
} }
public void setConfigOutputs(T taskBase, Configuration taskConfiguration, public void setConfigOutputs(AbstractInvokable taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException { List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
...@@ -238,15 +227,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -238,15 +227,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
setPartitioner(taskConfiguration, i, partitioners); setPartitioner(taskConfiguration, i, partitioners);
ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i); ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
if (taskBase instanceof StreamTask) { outputs.add(new RecordWriter<StreamRecord>(taskBase, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputsPartitioned.size() < batchSizesPartitioned.size()) { if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i)); outputsPartitioned.add(outputs.get(i));
} else { } else {
...@@ -283,30 +265,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -283,30 +265,30 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction; return userFunction;
} }
@SuppressWarnings("rawtypes") @SuppressWarnings({ "rawtypes", "unchecked" })
public UserSinkInvokable getSinkInvokable(Configuration config) { public UserSinkInvokable<Tuple> getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction", Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class); DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable) getInvokable(userFunctionClass, config); return (UserSinkInvokable<Tuple>) getInvokable(userFunctionClass, config);
} }
// TODO consider logging stack trace! // TODO consider logging stack trace!
@SuppressWarnings("rawtypes") @SuppressWarnings({ "rawtypes", "unchecked" })
public UserTaskInvokable getTaskInvokable(Configuration config) { public UserTaskInvokable<Tuple, Tuple> getTaskInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source // Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction", Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class); DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable) getInvokable(userFunctionClass, config); return (UserTaskInvokable<Tuple, Tuple>) getInvokable(userFunctionClass, config);
} }
@SuppressWarnings("rawtypes") @SuppressWarnings({ "rawtypes", "unchecked" })
public UserSourceInvokable getSourceInvokable(Configuration config) { public UserSourceInvokable<Tuple> getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source // Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction", Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class); DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable) getInvokable(userFunctionClass, config); return (UserSourceInvokable<Tuple>) getInvokable(userFunctionClass, config);
} }
// TODO find a better solution for this // TODO find a better solution for this
...@@ -328,6 +310,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -328,6 +310,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private void setPartitioner(Configuration config, int numberOfOutputs, private void setPartitioner(Configuration config, int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) { List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass( Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class, "partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class); ChannelSelector.class);
...@@ -360,8 +343,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -360,8 +343,8 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} }
} }
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs) public void invokeRecords(StreamRecordInvokable<Tuple, Tuple> userFunction,
throws Exception { AbstractRecordReader inputs) throws Exception {
if (inputs instanceof UnionStreamRecordReader) { if (inputs instanceof UnionStreamRecordReader) {
UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs; UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs;
while (recordReader.hasNext()) { while (recordReader.hasNext()) {
......
/*********************************************************************************************************************** /***********************************************************************************************************************
* * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
...@@ -10,7 +9,6 @@ ...@@ -10,7 +9,6 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent; package eu.stratosphere.streaming.api.streamcomponent;
...@@ -19,19 +17,19 @@ import java.io.IOException; ...@@ -19,19 +17,19 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/** /**
* A record writer connects an input gate to an application. It allows the * A record writer connects an input gate to an application. It allows the
* application query for incoming records and read them from input gate. * application query for incoming records and read them from input gate.
* *
* @param <StreamRecord>
* The type of the record that can be read from this record reader.
*/ */
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements
Reader<StreamRecord> { Reader<StreamRecord> {
...@@ -39,7 +37,6 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -39,7 +37,6 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
private final Class<? extends StreamRecord> recordType; private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<Tuple> deserializationDelegate; private DeserializationDelegate<Tuple> deserializationDelegate;
private TupleSerializer<Tuple> tupleSerializer; private TupleSerializer<Tuple> tupleSerializer;
/** /**
* Stores the last read record. * Stores the last read record.
*/ */
...@@ -62,41 +59,14 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -62,41 +59,14 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
* @param recordType * @param recordType
* The class of records that can be read from the record reader. * The class of records that can be read from the record reader.
* @param deserializationDelegate * @param deserializationDelegate
* Deserialization delegate * deserializationDelegate
* @param tupleSerializer
* Tuple serializer
*/
public StreamRecordReader(AbstractTask taskBase, Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord>
// get(), 0);
super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param outputBase
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
* @param deserializationDelegate
* Deserialization delegate
* @param tupleSerializer * @param tupleSerializer
* Tuple serializer * tupleSerializer
*/ */
public StreamRecordReader(AbstractOutputTask outputBase, public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate, DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) { TupleSerializer<Tuple> tupleSerializer) {
// super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> super(taskBase);
// get(), 0);
super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType; this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate; this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer; this.tupleSerializer = tupleSerializer;
...@@ -133,10 +103,11 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec ...@@ -133,10 +103,11 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
return true; return true;
case END_OF_SUPERSTEP: case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck()) if (incrementEndOfSuperstepEventAndCheck()) {
return false; return false;
else } else {
break; // fall through and wait for next record/event break; // fall through and wait for next record/event
}
case TASK_EVENT: case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent()); handleEvent(this.inputGate.getCurrentEvent());
......
...@@ -18,25 +18,25 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -18,25 +18,25 @@ package eu.stratosphere.streaming.api.streamcomponent;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.AbstractOutputTask; import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask { public class StreamSink extends AbstractInvokable {
private static final Log log = LogFactory.getLog(StreamSink.class); private static final Log log = LogFactory.getLog(StreamSink.class);
private AbstractRecordReader inputs; private AbstractRecordReader inputs;
private UserSinkInvokable userFunction; private UserSinkInvokable<Tuple> userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper; private StreamComponentHelper streamSinkHelper;
private String name; private String name;
public StreamSink() { public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here // TODO: Make configuration file visible and call setClassInputs() here
userFunction = null; userFunction = null;
streamSinkHelper = new StreamComponentHelper<StreamSink>(); streamSinkHelper = new StreamComponentHelper();
} }
@Override @Override
...@@ -54,8 +54,9 @@ public class StreamSink extends AbstractOutputTask { ...@@ -54,8 +54,9 @@ public class StreamSink extends AbstractOutputTask {
} }
} }
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration // FaultToleranceType faultToleranceType =
.getInteger("faultToleranceType", 0)); // FaultToleranceType.from(taskConfiguration
// .getInteger("faultToleranceType", 0));
userFunction = streamSinkHelper.getSinkInvokable(taskConfiguration); userFunction = streamSinkHelper.getSinkInvokable(taskConfiguration);
} }
...@@ -65,6 +66,7 @@ public class StreamSink extends AbstractOutputTask { ...@@ -65,6 +66,7 @@ public class StreamSink extends AbstractOutputTask {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoked"); log.debug("SINK " + name + " invoked");
} }
streamSinkHelper.invokeRecords(userFunction, inputs); streamSinkHelper.invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoke finished"); log.debug("SINK " + name + " invoke finished");
......
...@@ -21,50 +21,37 @@ import java.util.List; ...@@ -21,50 +21,37 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInputTask<DummyIS> { public class StreamSource extends AbstractInvokable {
private static final Log log = LogFactory.getLog(StreamSource.class); private static final Log log = LogFactory.getLog(StreamSource.class);
private List<RecordWriter<StreamRecord>> outputs; private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners; private List<ChannelSelector<StreamRecord>> partitioners;
private UserSourceInvokable userFunction; private UserSourceInvokable<Tuple> userFunction;
private static int numSources; private static int numSources;
private int sourceInstanceID; private int sourceInstanceID;
private String name; private String name;
private FaultToleranceUtil recordBuffer; // private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType; // private FaultToleranceType faultToleranceType;
StreamComponentHelper<StreamSource> streamSourceHelper; StreamComponentHelper streamSourceHelper;
public StreamSource() { public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<StreamRecord>>(); outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>(); partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null; userFunction = null;
streamSourceHelper = new StreamComponentHelper<StreamSource>(); streamSourceHelper = new StreamComponentHelper();
numSources = StreamComponentHelper.newComponent(); numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources; sourceInstanceID = numSources;
} }
@Override
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<DummyIS> getInputSplitType() {
return null;
}
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration(); Configuration taskConfiguration = getTaskConfiguration();
...@@ -85,10 +72,10 @@ public class StreamSource extends AbstractInputTask<DummyIS> { ...@@ -85,10 +72,10 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0); numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
} }
userFunction = (UserSourceInvokable) streamSourceHelper userFunction = (UserSourceInvokable<Tuple>) streamSourceHelper
.getSourceInvokable(taskConfiguration); .getSourceInvokable(taskConfiguration);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs); // streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs); // streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
} }
@Override @Override
...@@ -96,6 +83,10 @@ public class StreamSource extends AbstractInputTask<DummyIS> { ...@@ -96,6 +83,10 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID); log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
} }
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
userFunction.invoke(streamSourceHelper.collector); userFunction.invoke(streamSourceHelper.collector);
} }
......
...@@ -21,32 +21,31 @@ import java.util.List; ...@@ -21,32 +21,31 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader; import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractTask { public class StreamTask extends AbstractInvokable {
private static final Log log = LogFactory.getLog(StreamTask.class); private static final Log log = LogFactory.getLog(StreamTask.class);
private AbstractRecordReader inputs; private AbstractRecordReader inputs;
private List<RecordWriter<StreamRecord>> outputs; private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners; private List<ChannelSelector<StreamRecord>> partitioners;
private UserTaskInvokable userFunction; private UserTaskInvokable<Tuple, Tuple> userFunction;
private static int numTasks; private static int numTasks;
private int taskInstanceID; private int taskInstanceID;
private String name; private String name;
private StreamComponentHelper<StreamTask> streamTaskHelper; private StreamComponentHelper streamTaskHelper;
private FaultToleranceType faultToleranceType; // private FaultToleranceType faultToleranceType;
Configuration taskConfiguration; Configuration taskConfiguration;
private FaultToleranceUtil recordBuffer; // private FaultToleranceUtil recordBuffer;
public StreamTask() { public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here // TODO: Make configuration file visible and call setClassInputs() here
...@@ -55,7 +54,7 @@ public class StreamTask extends AbstractTask { ...@@ -55,7 +54,7 @@ public class StreamTask extends AbstractTask {
userFunction = null; userFunction = null;
numTasks = StreamComponentHelper.newComponent(); numTasks = StreamComponentHelper.newComponent();
taskInstanceID = numTasks; taskInstanceID = numTasks;
streamTaskHelper = new StreamComponentHelper<StreamTask>(); streamTaskHelper = new StreamComponentHelper();
} }
@Override @Override
...@@ -79,10 +78,11 @@ public class StreamTask extends AbstractTask { ...@@ -79,10 +78,11 @@ public class StreamTask extends AbstractTask {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0); numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
} }
userFunction = (UserTaskInvokable) streamTaskHelper.getTaskInvokable(taskConfiguration); userFunction = (UserTaskInvokable<Tuple, Tuple>) streamTaskHelper
.getTaskInvokable(taskConfiguration);
streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs); // streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs); // streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
} }
@Override @Override
...@@ -90,6 +90,11 @@ public class StreamTask extends AbstractTask { ...@@ -90,6 +90,11 @@ public class StreamTask extends AbstractTask {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID); log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
} }
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
streamTaskHelper.invokeRecords(userFunction, inputs); streamTaskHelper.invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
......
...@@ -19,69 +19,67 @@ import java.util.ArrayList; ...@@ -19,69 +19,67 @@ import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState; import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.streaming.state.StateManager;
import eu.stratosphere.util.Collector; import eu.stratosphere.util.Collector;
public class StreamWindowTask<InTuple extends Tuple, OutTuple extends Tuple> extends FlatMapFunction<InTuple, OutTuple> { public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
private int computeGranularity; private int computeGranularity;
private int windowFieldId; private int windowFieldId = 1;
private ArrayList<InTuple> tempTupleArray; private ArrayList tempArrayList;
private SlidingWindowState<InTuple> window; private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1; private long initTimestamp = -1;
private long nextTimestamp = -1; private long nextTimestamp = -1;
protected StateManager checkpointer = new StateManager("object.out", 1000);
public StreamWindowTask(int windowSize, int slidingStep, public StreamWindowTask(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) { int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity; this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId; this.windowFieldId = windowFieldId;
window = new SlidingWindowState<InTuple>(windowSize, slidingStep, window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity); computeGranularity);
checkpointer.registerState(window); sum = new MutableTableState<String, Integer>();
Thread t = new Thread(checkpointer); sum.put("sum", 0);
t.start();
} }
protected void incrementCompute(ArrayList<InTuple> tupleArray) {} private void incrementCompute(ArrayList tupleArray) {}
protected void decrementCompute(ArrayList<InTuple> tupleArray) {} private void decrementCompute(ArrayList tupleArray) {}
protected void produceOutput(long progress, Collector<OutTuple> out) {} private void produceOutput(long progress, Collector out) {}
@Override @Override
public void flatMap(InTuple value, Collector<OutTuple> out) public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
throws Exception {
long progress = (Long) value.getField(windowFieldId); long progress = (Long) value.getField(windowFieldId);
if (initTimestamp == -1) { if (initTimestamp == -1) {
initTimestamp = progress; initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity; nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<InTuple>(); tempArrayList = new ArrayList();
} else { } else {
if (progress > nextTimestamp) { if (progress > nextTimestamp) {
if (window.isFull()) { if (window.isFull()) {
ArrayList<InTuple> expiredTupleArray = window.popFront(); ArrayList expiredArrayList = window.popFront();
incrementCompute(tempTupleArray); incrementCompute(tempArrayList);
decrementCompute(expiredTupleArray); decrementCompute(expiredArrayList);
window.pushBack(tempTupleArray); window.pushBack(tempArrayList);
if (window.isEmittable()) { if (window.isEmittable()) {
produceOutput(progress, out); produceOutput(progress, out);
} }
} else { } else {
incrementCompute(tempTupleArray); incrementCompute(tempArrayList);
window.pushBack(tempTupleArray); window.pushBack(tempArrayList);
if (window.isFull()) { if (window.isFull()) {
produceOutput(progress, out); produceOutput(progress, out);
} }
} }
initTimestamp = nextTimestamp; initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity; nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<InTuple>(); tempArrayList = new ArrayList();
} }
tempTupleArray.add(value); tempArrayList.add(value);
} }
} }
} }
...@@ -19,10 +19,10 @@ import java.io.IOException; ...@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord> public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
...@@ -20,7 +20,7 @@ import java.util.List; ...@@ -20,7 +20,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID; import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceCounter; import eu.stratosphere.streaming.util.PerformanceCounter;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner; package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BroadcastPartitioner implements ChannelSelector<StreamRecord> { public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner; package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultPartitioner implements ChannelSelector<StreamRecord> { public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner; package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Grouping by a key //Grouping by a key
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner; package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Group to the partitioner with the lowest id //Group to the partitioner with the lowest id
......
...@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.partitioner; ...@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.partitioner;
import java.util.Random; import java.util.Random;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Randomly group, to distribute equally //Randomly group, to distribute equally
......
...@@ -22,27 +22,41 @@ import java.util.Map; ...@@ -22,27 +22,41 @@ import java.util.Map;
/** /**
* The most general internal state that stores data in a mutable map. * The most general internal state that stores data in a mutable map.
*/ */
public class TableState<K, V> implements Serializable { public class MutableTableState<K, V> implements TableState<K, V>, Serializable {
private Map<K, V> state=new LinkedHashMap<K, V>(); private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) { public void put(K key, V value) {
state.put(key, value); state.put(key, value);
} }
@Override
public V get(K key) { public V get(K key) {
return state.get(key); return state.get(key);
} }
@Override
public void delete(K key) { public void delete(K key) {
state.remove(key); state.remove(key);
} }
@Override
public boolean containsKey(K key) { public boolean containsKey(K key) {
return state.containsKey(key); return state.containsKey(key);
} }
public TableStateIterator<K, V> getIterator() { @Override
return new TableStateIterator<K, V>(state.entrySet().iterator()); public MutableTableStateIterator<K, V> getIterator() {
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
@Override
public String serialize() {
return null;
} }
@Override
public void deserialize(String str) {
}
} }
...@@ -20,17 +20,19 @@ import java.util.Map.Entry; ...@@ -20,17 +20,19 @@ import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
public class TableStateIterator<K, V>{ public class MutableTableStateIterator<K, V> implements TableStateIterator<K, V>{
private Iterator<Entry<K, V>> iterator; private Iterator<Entry<K, V>> iterator;
public TableStateIterator(Iterator<Entry<K, V>> iter){ public MutableTableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter; iterator=iter;
} }
@Override
public boolean hasNext() { public boolean hasNext() {
return iterator.hasNext(); return iterator.hasNext();
} }
@Override
public Tuple2<K, V> next() { public Tuple2<K, V> next() {
Entry<K, V> entry=iterator.next(); Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue()); return new Tuple2<K, V>(entry.getKey(), entry.getValue());
......
...@@ -20,15 +20,13 @@ import java.util.ArrayList; ...@@ -20,15 +20,13 @@ import java.util.ArrayList;
import org.apache.commons.collections.buffer.CircularFifoBuffer; import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.api.java.tuple.Tuple;
/** /**
* The window state for window operator. To be general enough, this class * The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to * implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the * compose time based window operator by extending this class by splitting the
* stream into multiple mini batches. * stream into multiple mini batches.
*/ */
public class SlidingWindowState<InTuple extends Tuple> implements Serializable{ public class SlidingWindowState implements Serializable{
private static final long serialVersionUID = -2376149970115888901L; private static final long serialVersionUID = -2376149970115888901L;
private int currentRecordCount; private int currentRecordCount;
private int fullRecordCount; private int fullRecordCount;
...@@ -45,13 +43,13 @@ public class SlidingWindowState<InTuple extends Tuple> implements Serializable{ ...@@ -45,13 +43,13 @@ public class SlidingWindowState<InTuple extends Tuple> implements Serializable{
this.buffer = new CircularFifoBuffer(fullRecordCount); this.buffer = new CircularFifoBuffer(fullRecordCount);
} }
public void pushBack(ArrayList<InTuple> tupleArray) { public void pushBack(ArrayList tupleArray) {
buffer.add(tupleArray); buffer.add(tupleArray);
currentRecordCount += 1; currentRecordCount += 1;
} }
public ArrayList<InTuple> popFront() { public ArrayList popFront() {
ArrayList<InTuple> frontRecord = (ArrayList<InTuple>) buffer.get(); ArrayList frontRecord = (ArrayList) buffer.get();
buffer.remove(); buffer.remove();
return frontRecord; return frontRecord;
} }
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SlidingWindowStateIterator<K>{
public boolean hasNext() {
return false;
}
public Tuple2<K, StreamRecord> next() {
return null;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface TableState<K, V>{
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public String serialize();
public void deserialize(String str);
public TableStateIterator<K, V> getIterator();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
/**
* the iterator for internal states.
*/
public interface TableStateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state.manager;
import java.util.LinkedList;
import eu.stratosphere.streaming.state.TableState;
public class StateCheckpointer {
private LinkedList<TableState> stateList = new LinkedList<TableState>();
public void RegisterState(TableState state){
stateList.add(state);
}
public void CheckpointStates(){
for(TableState state: stateList){
//take snapshot of every registered state.
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state.manager;
public class StateRestorer {
}
...@@ -30,14 +30,14 @@ public class ClusterUtil { ...@@ -30,14 +30,14 @@ public class ClusterUtil {
* *
* @param jobGraph * @param jobGraph
*/ */
public static void runOnMiniCluster(JobGraph jobGraph) { public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
System.out.println("Running on mini cluster");
Configuration configuration = jobGraph.getJobConfiguration(); Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster(); NepheleMiniCluster exec = new NepheleMiniCluster();
exec.setNumTaskTracker(numberOfTaskTrackers);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
System.out.println("Running on mini cluster");
try { try {
exec.start(); exec.start();
...@@ -45,6 +45,8 @@ public class ClusterUtil { ...@@ -45,6 +45,8 @@ public class ClusterUtil {
exec.stop(); exec.stop();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
} }
} }
......
...@@ -74,7 +74,7 @@ public class BatchReduceTest { ...@@ -74,7 +74,7 @@ public class BatchReduceTest {
@Test @Test
public void test() throws Exception { public void test() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple1<Double>> dataStream0 = env.addSource(new MySource(),1) DataStream<Tuple1<Double>> dataStream0 = env.addSource(new MySource(),1)
.batchReduce(new MyBatchReduce(), BATCH_SIZE, PARALELISM).addSink(new MySink()); .batchReduce(new MyBatchReduce(), BATCH_SIZE, PARALELISM).addSink(new MySink());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册