提交 47e2c821 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] setParallelism added for datastream operators to match main project

上级 90df9539
......@@ -5,18 +5,42 @@
<modelVersion>4.0.0</modelVersion>
<groupId>eu.stratosphere</groupId>
<version>0.2-SNAPSHOT</version>
<version>0.3-SNAPSHOT</version>
<artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name>
<packaging>pom</packaging>
<url>http://github.com/stratosphere/stratosphere-streaming</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>stratosphere</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/stratosphere/stratosphere-streaming</url>
<connection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</connection>
<developerConnection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</developerConnection>
</scm>
<packaging>jar</packaging>
<developers>
</developers>
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
</modules>
<properties>
<stratosphere.version>0.5</stratosphere.version>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<repositories>
<repository>
<id>dms-repo</id>
......@@ -25,47 +49,54 @@
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
......@@ -78,27 +109,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
<type>jar</type>
</dependency>
</dependencies>
......@@ -218,6 +236,7 @@
<excludes>
<!-- Additional files like .gitignore etc.-->
<exclude>**/.*</exclude>
<exclude>**/.*/**</exclude>
<!-- Resource files which have values. -->
<exclude>**/resources/**</exclude>
<!-- Configuration Files. -->
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.util.PerformanceCounter;
public abstract class StreamInvokableComponent<OUT extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
private List<RecordWriter<StreamRecord>> outputs;
protected int channelID;
protected String name;
private FaultToleranceUtil emittedRecords;
protected PerformanceCounter performanceCounter;
private boolean useFaultTolerance;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, int channelID,
String name, FaultToleranceUtil emittedRecords, FaultToleranceType faultToleranceType) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
this.name = name;
this.performanceCounter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/" + name + channelID);
this.useFaultTolerance = faultToleranceType != FaultToleranceType.NONE;
}
public final void setPerfCounterDir(String dir) {
performanceCounter.setFname(dir + "/" + name + channelID);
}
public final void emit(StreamRecord record) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record);
}
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
if (log.isInfoEnabled()) {
log.info("EMITTED: " + record.getId() + " -- " + name);
}
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
if (log.isWarnEnabled()) {
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
}
// TODO: Should we fail record at exception catch?
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
if (useFaultTolerance) {
emittedRecords.addRecord(record, outputChannel);
}
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
}
}
}
public String getResult() {
return "Override getResult() to pass your own results";
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.InputSplit;
public class DummyIS implements InputSplit {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void read(DataInput in) throws IOException {
}
@Override
public int getSplitNumber() {
return 0;
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.wordcount;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private String line = "";
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private Long timestamp = 0L;
// Reads the lines of the input file and adds a timestamp to it.
@Override
public void invoke(Collector<Tuple2<String, Long>> collector) throws Exception {
BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
while(true){
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
outRecord.f0 = line;
outRecord.f1 = timestamp;
collector.collect(outRecord);
timestamp++;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import java.util.*;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.SinkFunction;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>{
private static final long serialVersionUID = 1L;
private kafka.javaapi.producer.Producer<Integer, OUT> producer;
static Properties props;
private String topicId;
private String brokerAddr;
private boolean close = false;
private boolean initDone = false;
public KafkaSink(String topicId, String brokerAddr){
this.topicId=topicId;
this.brokerAddr=brokerAddr;
}
public void initialize() {
props = new Properties();
props.put("metadata.broker.list", brokerAddr);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<Integer, OUT>(config);
initDone = true;
}
//TODO should there be an end character, or open and close the producer for every message (because the end character might get to the sink sooner than the others because of the parallelism)
@Override
public void invoke(IN tuple) {
if(!initDone) initialize();
OUT out=serialize(tuple);
KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
producer.send(data);
if(close){
producer.close();
}
}
public abstract OUT serialize(IN tuple);
public void close(){
close=true;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
public class KafkaTopology {
public static final class MySource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// TODO Auto-generated method stub
for(int i=0; i<10; i++){
collector.collect(new Tuple1<String>(Integer.toString(i)));
}
collector.collect(new Tuple1<String>("q"));
}
}
public static final class MyKafkaSource extends KafkaSource<Tuple1<String>, String>{
public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
super(zkQuorum, groupId, topicId, numThreads);
// TODO Auto-generated constructor stub
}
@Override
public Tuple1<String> deserialize(byte[] msg) {
// TODO Auto-generated method stub
String s=new String(msg);
if(s.equals("q")){
close();
}
//System.out.println("ki: " + s);
return new Tuple1<String>(s);
}
}
public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String>{
public MyKafkaSink(String topicId, String brokerAddr) {
super(topicId, brokerAddr);
// TODO Auto-generated constructor stub
}
@Override
public String serialize(Tuple1<String> tuple) {
// TODO Auto-generated method stub
if(tuple.f0.equals("q")) close();
return tuple.f0;
}
}
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream1 = env.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
DataStream<Tuple1<String>> stream2 = env
.addSource(new MySource(), 1)
.addSink(new MyKafkaSink("test", "localhost:9092"));
env.execute();
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.performance;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.examples.wordcount.WordCountCounter;
public class WordCountPerformanceLocal {
public static class Mysink extends SinkFunction<Tuple2<String, Integer>> {
@Override
public void invoke(Tuple2<String, Integer> tuple) {
}
}
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment().setClusterSize(2);
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("/home/strato/stratosphere-distrib/resources/hamlet.txt", 4)
.flatMap(new WordCountPerformanceSplitter(), 2).partitionBy(0)
.map(new WordCountCounter(), 2).addSink(new Mysink(), 2);
env.executeCluster();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.performance;
import java.io.IOException;
import java.io.ObjectInputStream;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.util.Collector;
public class WordCountPerformanceSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
private Tuple1<String> outTuple = new Tuple1<String>();
PerformanceCounter pCounter;
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
out.collect(outTuple);
pCounter.count();
}
}
private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
ois.defaultReadObject();
pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/resources/splitter.csv");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.SerializationUtils;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class RMQTopology {
public static final class MyRMQSink extends RMQSink<Tuple1<String>> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
// TODO Auto-generated constructor stub
}
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(Tuple t) {
// TODO Auto-generated method stub
if(t.getField(0).equals("q")) close();
return SerializationUtils.serialize((String)t.getField(0));
}
}
public static final class MyRMQSource extends RMQSource<Tuple1<String>> {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
// TODO Auto-generated constructor stub
}
private static final long serialVersionUID = 1L;
@Override
public Tuple1<String> deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
Tuple1<String> out=new Tuple1<String>();
out.f0=s;
if(s.equals("q")){
close();
}
return out;
}
}
private static Set<String> result = new HashSet<String>();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"), 1)
.print();
DataStream<Tuple1<String>> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
env.execute();
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import java.io.Serializable;
public class PerformanceCounter extends PerformanceTracker implements Serializable{
public PerformanceCounter(String name, int counterLength, int countInterval, String fname) {
super(name, counterLength, countInterval, fname);
}
public PerformanceCounter(String name, int counterLength, int countInterval, long dumpInterval,
String fname) {
super(name, counterLength, countInterval, dumpInterval, fname);
}
public PerformanceCounter(String name, String fname) {
super(name, fname);
}
public void count(long i, String label) {
buffer = buffer + i;
intervalCounter++;
if (intervalCounter % interval == 0) {
intervalCounter = 0;
add(buffer, label);
}
}
public void count(long i) {
count(i, "counter");
}
public void count(String label) {
count(1, label);
}
public void count() {
count(1, "counter");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
public class PerformanceTimer extends PerformanceTracker {
long timer;
boolean millis;
public PerformanceTimer(String name, int counterLength, int countInterval, boolean millis,
long dumpInterval, String fname) {
super(name, counterLength, countInterval, dumpInterval, fname);
this.millis = millis;
}
public PerformanceTimer(String name, int counterLength, int countInterval, boolean millis,
String fname) {
super(name, counterLength, countInterval, fname);
this.millis = millis;
}
public PerformanceTimer(String name, boolean millis, String fname) {
super(name, fname);
this.millis = millis;
}
public void startTimer() {
if (millis) {
timer = System.currentTimeMillis();
} else {
timer = System.nanoTime();
}
}
public void stopTimer(String label) {
if (millis) {
track(System.currentTimeMillis() - timer, label);
} else {
track(System.nanoTime() - timer, label);
}
}
public void stopTimer() {
stopTimer("timer");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class PerformanceTracker implements Serializable{
protected List<Long> timeStamps;
protected List<Long> values;
protected List<String> labels;
protected long dumpInterval = 0;
protected long lastDump = 0;
protected String fname;
protected long startTime;
protected int interval;
protected int intervalCounter;
protected String name;
protected long buffer;
public PerformanceTracker(String name, String fname) {
timeStamps = new ArrayList<Long>();
values = new ArrayList<Long>();
labels = new ArrayList<String>();
this.interval = 1;
this.name = name;
this.fname = fname;
buffer = 0;
this.startTime = System.currentTimeMillis();
}
public PerformanceTracker(String name, int capacity, int interval, String fname) {
this(name, capacity, interval, 0, fname);
}
public PerformanceTracker(String name, int capacity, int interval, long dumpInterval,
String fname) {
timeStamps = new ArrayList<Long>(capacity);
values = new ArrayList<Long>(capacity);
labels = new ArrayList<String>(capacity);
this.interval = interval;
this.name = name;
buffer = 0;
this.dumpInterval = dumpInterval;
this.fname = fname;
this.startTime = System.currentTimeMillis();
}
public void track(Long value, String label) {
buffer = buffer + value;
intervalCounter++;
if (intervalCounter % interval == 0) {
add(buffer, label);
buffer = 0;
intervalCounter = 0;
}
}
public void add(Long value, String label) {
long ctime = System.currentTimeMillis() - startTime;
values.add(value);
labels.add(label);
timeStamps.add(ctime);
if (dumpInterval > 0) {
if (ctime - lastDump > dumpInterval) {
System.out.println("csv-be iras!!");
writeCSV();
lastDump = ctime;
}
}
}
public void track(Long value) {
track(value, "tracker");
}
public void track(int value, String label) {
track(Long.valueOf(value), label);
}
public void track(int value) {
track(Long.valueOf(value), "tracker");
}
public void track() {
track(1);
}
@Override
public String toString() {
StringBuilder csv = new StringBuilder();
csv.append("Time," + name + ",Label\n");
for (int i = 0; i < timeStamps.size(); i++) {
csv.append(timeStamps.get(i) + "," + values.get(i) + "," + labels.get(i) + "\n");
}
return csv.toString();
}
public void writeCSV() {
try {
PrintWriter out = new PrintWriter(fname);
out.print(toString());
out.close();
} catch (FileNotFoundException e) {
System.out.println("CSV output file not found");
}
}
public void writeCSV(String fname) {
try {
PrintWriter out = new PrintWriter(fname);
out.print(toString());
out.close();
} catch (FileNotFoundException e) {
System.out.println("CSV output file not found");
}
}
public void setFname(String fname) {
this.fname = fname;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import static org.junit.Assert.*;
import org.junit.Test;
public class PerformanceTrackerTest {
@Test
public void testPerformanceTracker() {
// fail("Not yet implemented");
}
@Test
public void testTrackLong() {
// fail("Not yet implemented");
}
@Test
public void testTrack() {
PerformanceTracker pT = new PerformanceTracker("tracker", "");
pT.track();
pT.track(3);
pT.track(1);
assertEquals(3, pT.timeStamps.size());
assertEquals(3, pT.values.size());
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(3), pT.values.get(1));
assertEquals(Long.valueOf(1), pT.values.get(2));
PerformanceTracker pT2 = new PerformanceTracker("tracker", 10, 2, "");
pT2.track(1);
pT2.track(3);
pT2.track(1);
pT2.track(3);
assertEquals(2, pT2.timeStamps.size());
assertEquals(2, pT2.values.size());
assertEquals(Long.valueOf(4), pT2.values.get(0));
assertEquals(Long.valueOf(4), pT2.values.get(1));
System.out.println(pT2);
System.out.println("--------------");
}
@Test
public void testCount() {
PerformanceCounter pC = new PerformanceCounter("counter", "");
pC.count();
pC.count(10);
pC.count();
assertEquals(3, pC.timeStamps.size());
assertEquals(3, pC.values.size());
assertEquals(Long.valueOf(1), pC.values.get(0));
assertEquals(Long.valueOf(11), pC.values.get(1));
assertEquals(Long.valueOf(12), pC.values.get(2));
System.out.println(pC);
System.out.println("--------------");
PerformanceCounter pT2 = new PerformanceCounter("counter", 1000, 10000, "");
for (int i = 0; i < 10000000; i++) {
pT2.count("test");
}
assertEquals(1000, pT2.timeStamps.size());
// pT2.writeCSV("C:/temp/test.csv");
}
@Test
public void testTimer() throws InterruptedException {
PerformanceTimer pT = new PerformanceTimer("timer", true, "");
pT.startTimer();
Thread.sleep(100);
pT.stopTimer();
System.out.println(pT.values.get(0));
assertEquals(1, pT.timeStamps.size());
assertEquals(1, pT.values.size());
assertTrue(pT.values.get(0) < 200);
System.out.println(pT);
}
}
ClusterProvider#clusterId=local1TM
LocalClusterProvider#numTaskTrackers=1
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=local_fs
\ No newline at end of file
ClusterProvider#clusterId=local4TM
LocalClusterProvider#numTaskTrackers=4
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=mini_hdfs
\ No newline at end of file
CoGroupITCase=local1TM,local4TM
CrossITCase=local1TM
MapITCase=local1TM,local4TM
MatchITCase=local1TM
ReduceITCase=local1TM
\ No newline at end of file
TPCHQuery3ITCase=local1TM,local4TM
TPCHQuery4ITCase=local1TM,local4TM
TPCHQuery9ITCase=local1TM,local4TM
TPCHQuery10ITCase=local1TM,local4TM
GlobalSortingITCase=local1TM,local4TM
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.3-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-addons</artifactId>
<name>stratosphere-streaming-addons</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
package eu.stratosphere.streaming.addons.kafka;
import java.io.BufferedReader;
import java.io.FileReader;
......@@ -22,6 +22,9 @@ import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Producer for Kafka jobs.
*/
public class KafkaProducer {
static kafka.javaapi.producer.Producer<Integer, String> producer;
static Properties props = new Properties();
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
package eu.stratosphere.streaming.addons.kafka;
import java.util.HashMap;
import java.util.List;
......@@ -24,15 +24,15 @@ import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
/**
* Source for reading messages from a Kafka queue.
* The source currently only support string messages.
*/
public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<IN>{
public class KafkaSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
......@@ -40,9 +40,8 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
private final String topicId;
private final int numThreads;
private ConsumerConnector consumer;
private boolean close = false;
IN outTuple;
Tuple1<String> outTuple = new Tuple1<String>();
public KafkaSource(String zkQuorum, String groupId, String topicId,
int numThreads) {
......@@ -64,7 +63,7 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
}
@Override
public void invoke(Collector<IN> collector) throws Exception {
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
initializeConnection();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
......@@ -75,20 +74,12 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
IN out=deserialize(it.next().message());
if(!close){
collector.collect(out);
}
else {
String message = new String(it.next().message());
if (message.equals("q")) {
break;
}
outTuple.f0 = message;
collector.collect(outTuple);
}
consumer.shutdown();
}
public abstract IN deserialize(byte[] msg);
public void close(){
close=true;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.kafka;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class KafkaTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
context.execute();
}
}
\ No newline at end of file
......@@ -13,12 +13,12 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
......@@ -29,16 +29,15 @@ import com.rabbitmq.client.Channel;
* support string messages. Other types will be added soon.
*
*/
public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN>{
public class RMQSink extends SinkFunction<Tuple1<String>>{
private static final long serialVersionUID = 1L;
private boolean close=false;
private String QUEUE_NAME;
private String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private boolean initDone=false;
public RMQSink(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
......@@ -59,37 +58,32 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN>{
e.printStackTrace();
}
initDone=true;
}
@Override
public void invoke(IN tuple) {
if(!initDone) initializeConnection();
public void invoke(Tuple1<String> tuple) {
initializeConnection();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
byte[] msg = serialize(tuple);
channel.basicPublish("", QUEUE_NAME, null, msg);
String message = tuple.f0;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if(close){
try {
channel.close();
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
channel.close();
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public abstract byte[] serialize(Tuple t);
public void close(){
close=true;
}
}
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException;
......@@ -24,8 +24,8 @@ import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
/**
......@@ -33,13 +33,11 @@ import eu.stratosphere.util.Collector;
* support string messages. Other types will be added soon.
*
*/
public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
public class RMQSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
private boolean close=false;
private transient ConnectionFactory factory;
private transient Connection connection;
......@@ -47,7 +45,9 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
IN outTuple;
private transient String message;
Tuple1<String> outTuple = new Tuple1<String>();
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
......@@ -66,41 +66,41 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
} catch (IOException e) {
}
}
@Override
public void invoke(Collector<IN> collector) throws Exception {
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
initializeConnection();
while (!close) {
try {
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
e.printStackTrace();
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
outTuple=deserialize(delivery.getBody());
if(!close)
collector.collect(outTuple);
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public abstract IN deserialize(byte[] t);
public void close(){
close=true;
while (true) {
try {
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
e.printStackTrace();
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
message = new String(delivery.getBody());
if (message.equals("q")) {
break;
}
outTuple.f0 = message;
collector.collect(outTuple);
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
import static org.junit.Assert.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.util.Collector;
public class RMQTest {
public static final class MySink extends SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> tuple) {
result.add(tuple.f0);
}
}
private static Set<String> expected = new HashSet<String>();
private static Set<String> result = new HashSet<String>();
private static void fillExpected() {
expected.add("one");
expected.add("two");
expected.add("three");
expected.add("four");
expected.add("five");
}
@Test
public void RMQTest1() throws Exception {
//
// StreamExecutionEnvironment env = new StreamExecutionEnvironment();
//
// DataStream<Tuple1<String>> dataStream1 = env
// .addSource(new RMQSource("localhost", "hello"), 1)
// .addSink(new MySink());
//
// DataStream<Tuple1<String>> dataStream2 = env
// .fromElements("one", "two", "three", "four", "five", "q")
// .addSink(new RMQSink("localhost", "hello"));
//
// env.execute();
//
// fillExpected();
//
// assertEquals(expected, result);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class RMQTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"),
SOURCE_PARALELISM).print();
context.execute();
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.3-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-core</artifactId>
<name>stratosphere-streaming-core</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -24,21 +24,41 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.invokable.BatchReduceInvokable;
import eu.stratosphere.streaming.api.invokable.FilterInvokable;
import eu.stratosphere.streaming.api.invokable.FlatMapInvokable;
import eu.stratosphere.streaming.api.invokable.MapInvokable;
import eu.stratosphere.types.TypeInformation;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
* can be transformed into another DataStream by applying a transformation as
* for example
* <ul>
* <li>{@link DataStream#map},</li>
* <li>{@link DataStream#filter}, or</li>
* <li>{@link DataStream#batchReduce}.</li>
* </ul>
*
* @param <T>
* The type of the DataStream, i.e., the type of the elements of the
* DataStream.
*/
public class DataStream<T extends Tuple> {
private static Integer counter = 0;
private final StreamExecutionEnvironment environment;
private TypeInformation<T> type;
private String id;
int dop;
List<String> connectIDs;
List<ConnectionType> ctypes;
List<Integer> cparams;
List<Integer> batchSizes;
/**
* Create a new DataStream in the given environment
* Create a new {@link DataStream} in the given execution environment
*
* @param environment
* StreamExecutionEnvironment
......@@ -59,12 +79,13 @@ public class DataStream<T extends Tuple> {
}
/**
* Create a new DataStream in the given environment with the given id
* Create a new {@link DataStream} in the given environment with the given
* id
*
* @param environment
* StreamExecutionEnvironment
* @param id
* The id of the stream
* The id of the DataStream
*/
private DataStream(StreamExecutionEnvironment environment, String operatorType, String id) {
this.environment = environment;
......@@ -72,7 +93,8 @@ public class DataStream<T extends Tuple> {
}
/**
* Initialize the connections.
* Initialize the connection and partitioning among the connected
* {@link DataStream}s.
*/
private void initConnections() {
connectIDs = new ArrayList<String>();
......@@ -87,7 +109,7 @@ public class DataStream<T extends Tuple> {
}
/**
* Creates an identical DataStream.
* Creates an identical {@link DataStream}.
*
* @return The DataStream copy.
*/
......@@ -100,25 +122,56 @@ public class DataStream<T extends Tuple> {
copiedStream.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(this.ctypes);
copiedStream.cparams = new ArrayList<Integer>(this.cparams);
copiedStream.batchSizes = new ArrayList<Integer>(this.batchSizes);
copiedStream.dop = this.dop;
return copiedStream;
}
/**
* Returns the id of the DataStream.
* Returns the ID of the {@link DataStream}.
*
* @return ID ID of the datastream
* @return ID of the datastream
*/
public String getId() {
return id;
}
/**
* Groups a number of consecutive elements from the DataStream to increase
* network throughput.
* Sets the degree of parallelism for this operator. The degree must be 1 or
* more.
*
* @param dop
* The degree of parallelism for this operator.
* @return The operator with set degree of parallelism.
*/
public DataStream<T> setParallelism(int dop) {
if (dop < 1) {
throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
}
this.dop = dop;
environment.setOperatorParallelism(this);
return this.copy();
}
/**
* Gets the degree of parallelism for this operator.
*
* @return The parallelism set for this operator.
*/
public int getParallelism() {
return this.dop;
}
/**
* Groups a number of consecutive elements from the {@link DataStream} to
* increase network throughput. It has no effect on the operators applied to
* the DataStream.
*
* @param batchSize
* The number of elements to group.
* @return The DataStream.
* @return The DataStream with batching set.
*/
public DataStream<T> batch(int batchSize) {
DataStream<T> returnStream = copy();
......@@ -134,12 +187,13 @@ public class DataStream<T extends Tuple> {
}
/**
* Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a
* joint output of the connected streams.
* Connecting {@link DataStream} outputs with each other for applying joint
* operators on them. The DataStreams connected using this operator will be
* transformed simultaneously. It creates a joint output of the connected
* DataStreams.
*
* @param streams
* The DataStream to connect output with.
* The DataStreams to connect output with.
* @return The connected DataStream.
*/
public DataStream<T> connectWith(DataStream<T>... streams) {
......@@ -151,7 +205,7 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
private DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams);
......@@ -161,8 +215,8 @@ public class DataStream<T extends Tuple> {
}
/**
* Send the output tuples of the DataStream to the next vertices partitioned
* by their hashcode.
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are partitioned by their hashcode and are sent to only one component.
*
* @param keyposition
* The field used to compute the hashcode.
......@@ -179,8 +233,8 @@ public class DataStream<T extends Tuple> {
}
/**
* Broadcast the output tuples to every parallel instance of the next
* component.
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component.
*
* @return The DataStream with broadcast partitioning set.
*/
......@@ -194,9 +248,27 @@ public class DataStream<T extends Tuple> {
}
/**
* Applies a FlatMap transformation on a DataStream. The transformation
* calls a FlatMapFunction for each element of the DataSet. Each
* FlatMapFunction call can return any number of elements including none.
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link MapFunction} for each element of the DataStream. Each
* MapFunction call returns exactly one element.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param <R>
* output type
* @return The transformed DataStream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper));
}
/**
* Applies a FlatMap transformation on a {@link DataStream}. The
* transformation calls a FlatMapFunction for each element of the DataSet.
* Each FlatMapFunction call can return any number of elements including
* none.
*
* @param flatMapper
* The FlatMapFunction that is called for each element of the
......@@ -208,35 +280,34 @@ public class DataStream<T extends Tuple> {
* output type
* @return The transformed DataStream.
*/
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, int parallelism) {
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return environment.addFunction("flatMap", this.copy(), flatMapper,
new FlatMapInvokable<T, R>(flatMapper), parallelism);
new FlatMapInvokable<T, R>(flatMapper));
}
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
* returns exactly one element.
* Applies a Filter transformation on a {@link DataStream}. The
* transformation calls a {@link FilterFunction} for each element of the
* DataStream and retains only those element for which the function returns
* true. Elements for which the function returns false are filtered.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* @param parallelism
* The number of threads the function runs on.
* @param <R>
* output type
* @return The transformed DataStream.
* @return The filtered DataStream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
parallelism);
public DataStream<T> filter(FilterFunction<T> filter) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter));
}
/**
* Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a GroupReduceFunction for each tuple batch of the
* predefined size. Each GroupReduceFunction call can return any number of
* elements including none.
* transformation calls a {@link GroupReduceFunction} for each tuple batch
* of the predefined size. Each GroupReduceFunction call can return any
* number of elements including none.
*
*
* @param reducer
......@@ -247,67 +318,42 @@ public class DataStream<T extends Tuple> {
* The number of threads the function runs on.
* @param <R>
* output type
* @return The modified datastream.
* @return The modified DataStream.
*/
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize, int parallelism) {
int batchSize) {
return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer,
new BatchReduceInvokable<T, R>(reducer), parallelism);
}
/**
* Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only
* those element for which the function returns true. Elements for which the
* function returns false are filtered.
*
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* @param parallelism
* The number of threads the function runs on.
* @return The filtered DataStream.
*/
public DataStream<T> filter(FilterFunction<T> filter, int parallelism) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter), parallelism);
new BatchReduceInvokable<T, R>(reducer));
}
/**
* Sets the given sink function.
* Adds the given sink to this environment. Only streams with sinks added
* will be executed once the {@link StreamExecutionEnvironment#execute()}
* method is called.
*
* @param sinkFunction
* The object containing the sink's invoke function.
* @param parallelism
* The number of threads the function runs on.
* @return The modified datastream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction, int parallelism) {
return environment.addSink(this.copy(), sinkFunction, parallelism);
}
/**
* Sets the given sink function.
*
* @param sinkFunction
* The object containing the sink's invoke function.
* @return The closed datastream.
* @return The modified DataStream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return environment.addSink(this.copy(), sinkFunction);
}
/**
* Prints the tuples from the DataStream.
* Writes a DataStream to the standard output stream (stdout).<br/>
* For each element of the DataStream the result of
* {@link Object#toString()} is written.
*
* @return The closed datastream.
* @return The closed DataStream.
*/
public DataStream<T> print() {
return environment.print(this.copy());
}
/**
* Set the type parameter.
* Set the type parameter for this DataStream.
*
* @param type
* The type parameter.
......@@ -317,7 +363,7 @@ public class DataStream<T extends Tuple> {
}
/**
* Get the type information.
* Get the type information for this DataStream.
*
* @return The type of the generic parameter.
*/
......
......@@ -28,8 +28,6 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -37,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -59,12 +59,13 @@ public class JobGraphBuilder {
private final JobGraph jobGraph;
protected Map<String, AbstractJobVertex> components;
protected Map<String, Integer> numberOfInstances;
protected Map<String, List<Integer>> numberOfOutputChannels;
protected Map<String, List<String>> edgeList;
protected Map<String, List<Class<? extends ChannelSelector<StreamRecord>>>> connectionTypes;
protected String maxParallelismVertexName;
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
private int batchSize;
private long batchTimeout;
private int batchSize = 1;
private long batchTimeout = 1000;
/**
* Creates a new JobGraph with the given name
......@@ -78,7 +79,8 @@ public class JobGraphBuilder {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
numberOfOutputChannels = new HashMap<String, List<Integer>>();
edgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<Class<? extends ChannelSelector<StreamRecord>>>>();
maxParallelismVertexName = "";
maxParallelism = 0;
if (log.isDebugEnabled()) {
......@@ -87,24 +89,12 @@ public class JobGraphBuilder {
this.faultToleranceType = faultToleranceType;
}
/**
* Creates a new JobGraph with the given parameters
*
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
* defaultBatchTimeoutMillis
*/
public void setDefaultBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
int defaultBatchSize, long defaultBatchTimeoutMillis) {
this(jobGraphName, faultToleranceType);
this.batchSize = defaultBatchSize;
this.batchTimeout = defaultBatchTimeoutMillis;
public void setBatchTimeout(int timeout) {
this.batchTimeout = timeout;
}
/**
......@@ -120,18 +110,16 @@ public class JobGraphBuilder {
* Serialized udf
* @param parallelism
* Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
String operatorName, byte[] serializedFunction, int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
source.setInvokableClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
parallelism);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
......@@ -151,17 +139,15 @@ public class JobGraphBuilder {
* Serialized udf
* @param parallelism
* Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
String operatorName, byte[] serializedFunction, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setInvokableClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
parallelism);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
......@@ -181,16 +167,13 @@ public class JobGraphBuilder {
* Serialized udf
* @param parallelism
* Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
String operatorName, byte[] serializedFunction, int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
sink.setInvokableClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
......@@ -218,10 +201,9 @@ public class JobGraphBuilder {
*/
private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism, int subtasksPerInstance) {
int parallelism) {
component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
......@@ -242,20 +224,6 @@ public class JobGraphBuilder {
numberOfInstances.put(componentName, parallelism);
}
/**
* Sets the number of tuples batched together for higher throughput
*
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
......@@ -285,86 +253,35 @@ public class JobGraphBuilder {
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* from
* @param to
* to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
* Sets the number of tuples batched together for higher throughput
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
}
/**
* Sets instance sharing between the given components
* Sets the number of parallel instances created for the given component.
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
* @param componentName
* Name of the component
* @param parallelism
* Number of parallel instances created
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
public void setParallelism(String componentName, int parallelism) {
components.get(componentName).setNumberOfSubtasks(parallelism);
numberOfInstances.put(componentName, parallelism);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
maxParallelismVertexName = componentName;
}
}
/**
......@@ -381,7 +298,6 @@ public class JobGraphBuilder {
*/
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class);
addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
log.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
......@@ -406,6 +322,8 @@ public class JobGraphBuilder {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
addToEdges(upStreamComponentName, downStreamComponentName, FieldsPartitioner.class);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
......@@ -420,11 +338,6 @@ public class JobGraphBuilder {
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger("numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1),
numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
+ downStreamComponentName + ", KEY: " + keyPosition);
......@@ -440,6 +353,21 @@ public class JobGraphBuilder {
}
private void addToEdges(String upStreamComponentName, String downStreamComponentName,
Class<?> ctype) {
if (edgeList.containsKey(upStreamComponentName)) {
connectionTypes.get(upStreamComponentName).add(FieldsPartitioner.class);
edgeList.get(upStreamComponentName).add(downStreamComponentName);
} else {
connectionTypes.put(upStreamComponentName,
new ArrayList<Class<? extends ChannelSelector<StreamRecord>>>());
connectionTypes.get(upStreamComponentName).add(FieldsPartitioner.class);
edgeList.put(upStreamComponentName, new ArrayList<String>());
edgeList.get(upStreamComponentName).add(downStreamComponentName);
}
}
/**
* Connects two components with the given names by global partitioning.
* <p>
......@@ -453,7 +381,6 @@ public class JobGraphBuilder {
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
addOutputChannels(upStreamComponentName, 1);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
......@@ -471,28 +398,94 @@ public class JobGraphBuilder {
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
addOutputChannels(upStreamComponentName, 1);
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
* Sets the number of instances for a given component, used for fault
* tolerance purposes
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* upStreamComponentName
* @param numOfInstances
* numOfInstances
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
*/
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
} else {
numberOfOutputChannels.put(upStreamComponentName, new ArrayList<Integer>());
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
addToEdges(upStreamComponentName, downStreamComponentName, PartitionerClass);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* from
* @param to
* to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
}
/**
* Writes number of inputs into each JobVertex's config
*/
......@@ -512,13 +505,23 @@ public class JobGraphBuilder {
component.getConfiguration().setInteger("numberOfOutputs",
component.getNumberOfForwardConnections());
}
}
for (String component : numberOfOutputChannels.keySet()) {
Configuration config = components.get(component).getConfiguration();
List<Integer> channelNumList = numberOfOutputChannels.get(component);
for (int i = 0; i < channelNumList.size(); i++) {
config.setInteger("channels_" + i, channelNumList.get(i));
/**
* Sets partitioner parameters which can only be set when the full graph is
* built
*/
private void setPartitionerParameters() {
for (String componentName : connectionTypes.keySet()) {
int i = 0;
for (Class<?> ctype : connectionTypes.get(componentName)) {
if (ctype.equals(FieldsPartitioner.class)) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("numOfOutputs_" + i,
numberOfInstances.get(edgeList.get(componentName).get(i)));
}
}
i++;
}
}
......@@ -531,6 +534,7 @@ public class JobGraphBuilder {
setAutomaticInstanceSharing();
setNumberOfJobInputs();
setNumberOfJobOutputs();
setPartitionerParameters();
return jobGraph;
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*
*/
@Override
public void execute() {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
}
public void executeTest(long memorySize) {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
memorySize);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.client.program.ProgramInvocationException;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.JobGraph;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
private String host;
private int port;
private String[] jarFiles;
public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
if (host == null) {
throw new NullPointerException("Host must not be null.");
}
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}
this.host = host;
this.port = port;
this.jarFiles = jarFiles;
}
@Override
public void execute() {
try {
JobGraph jobGraph = jobGraphBuilder.getJobGraph();
for (int i = 0; i < jarFiles.length; i++) {
File file = new File(jarFiles[i]);
JobWithJars.checkJarFile(file);
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(host, port), configuration);
client.run(jobGraph, true);
} catch (IOException e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
}
} catch (ProgramInvocationException e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
}
}
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.BufferedReader;
import java.io.FileReader;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.BufferedReader;
import java.io.FileReader;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.function;
import java.util.Arrays;
import java.util.Collection;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsFunction(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsFunction(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
/**
* Source Function used to generate the number sequence
*
*/
public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public GenSequenceFunction(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple;
/**
* Dummy implementation of the SinkFunction writing every tuple to the standard
* output. Used for print.
*
* @param <IN>
* Input tuple type
*/
public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.Serializable;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
......@@ -12,13 +12,12 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import java.util.Iterator;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......@@ -32,6 +31,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U
@Override
public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception {
@SuppressWarnings("unchecked")
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector);
}
......
......@@ -12,24 +12,27 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
FilterFunction<IN> filterFunction;
public FilterInvokable(FilterFunction<IN> filterFunction) {
this.filterFunction = filterFunction;
}
@Override
public void invoke(StreamRecord record, Collector<IN> collector) throws Exception {
for (int i = 0; i < record.getBatchSize(); i++) {
@SuppressWarnings("unchecked")
IN tuple = (IN) record.getTuple(i);
if (filterFunction.filter(tuple)) {
collector.collect(tuple);
......
......@@ -13,11 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -13,11 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -13,10 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
public abstract class StreamComponent implements Serializable {
public abstract class StreamComponentInvokable implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -20,7 +20,10 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponent {
StreamComponentInvokable {
private static final long serialVersionUID = 1L;
public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception;
}
......@@ -20,7 +20,7 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.util.Collector;
public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponent implements
public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponentInvokable implements
Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
......@@ -102,6 +102,7 @@ public class ArrayStreamRecord extends StreamRecord {
* Value to set
* @throws NoSuchTupleException
* , TupleSizeMismatchException
* @return Returns the StreamRecord object
*/
public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
try {
......
......@@ -111,6 +111,7 @@ public class ListStreamRecord extends StreamRecord {
* Value to set
* @throws NoSuchTupleException
* , TupleSizeMismatchException
* @return Returns the StreamRecord object
*/
public StreamRecord setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException {
try {
......
......@@ -13,13 +13,11 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamrecord;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.util.Collector;
public class StreamCollector<T extends Tuple> implements Collector<T> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册