提交 e416f39f 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] pom update

上级 a2cabbb7
......@@ -7,9 +7,34 @@
<groupId>eu.stratosphere</groupId>
<version>0.2-SNAPSHOT</version>
<artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name>
<packaging>pom</packaging>
<url>http://github.com/stratosphere/stratosphere-streaming</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>stratosphere</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/stratosphere/stratosphere-streaming</url>
<connection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</connection>
<developerConnection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</developerConnection>
</scm>
<packaging>jar</packaging>
<developers>
</developers>
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
</modules>
<properties>
<stratosphere.version>0.5</stratosphere.version>
......@@ -17,6 +42,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<!--
<repositories>
<repository>
<id>dms-repo</id>
......@@ -25,47 +51,55 @@
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
-->
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
......@@ -78,27 +112,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
<type>jar</type>
</dependency>
</dependencies>
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SinkFunction;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
/**
* Source for sending messages to a RabbitMQ queue. The source currently only
* support string messages. Other types will be added soon.
*
*/
public class RMQSink extends SinkFunction<Tuple1<String>>{
private static final long serialVersionUID = 1L;
private String QUEUE_NAME;
private String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
public RMQSink(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
public void initializeConnection(){
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
try {
connection = factory.newConnection();
channel = connection.createChannel();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke(Tuple1<String> tuple) {
initializeConnection();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = tuple.f0;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
channel.close();
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.rabbitmq.RMQSink;
import eu.stratosphere.streaming.rabbitmq.RMQSource;
import eu.stratosphere.util.Collector;
public class RMQTest {
public static final class MySink extends SinkFunction<Tuple1<String>> {
@Override
public void invoke(Tuple1<String> tuple) {
result.add(tuple.f0);
}
}
private static Set<String> expected = new HashSet<String>();
private static Set<String> result = new HashSet<String>();
private static void fillExpected() {
expected.add("one");
expected.add("two");
expected.add("three");
expected.add("four");
expected.add("five");
}
@Test
public void RMQTest1() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream1 = env
.addSource(new RMQSource("localhost", "hello"), 1)
.addSink(new MySink());
DataStream<Tuple1<String>> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new RMQSink("localhost", "hello"));
env.execute();
fillExpected();
assertEquals(expected, result);
}
}
ClusterProvider#clusterId=local1TM
LocalClusterProvider#numTaskTrackers=1
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=local_fs
\ No newline at end of file
ClusterProvider#clusterId=local4TM
LocalClusterProvider#numTaskTrackers=4
ClusterProvider#clusterProviderType=LocalClusterProvider
FilesystemProvider#filesystemType=mini_hdfs
\ No newline at end of file
CoGroupITCase=local1TM,local4TM
CrossITCase=local1TM
MapITCase=local1TM,local4TM
MatchITCase=local1TM
ReduceITCase=local1TM
\ No newline at end of file
TPCHQuery3ITCase=local1TM,local4TM
TPCHQuery4ITCase=local1TM,local4TM
TPCHQuery9ITCase=local1TM,local4TM
TPCHQuery10ITCase=local1TM,local4TM
GlobalSortingITCase=local1TM,local4TM
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-addons</artifactId>
<name>stratosphere-streaming-addons</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
package eu.stratosphere.streaming.addons.kafka;
import java.io.BufferedReader;
import java.io.FileReader;
......@@ -22,6 +22,9 @@ import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Producer for Kafka jobs.
*/
public class KafkaProducer {
static kafka.javaapi.producer.Producer<Integer, String> producer;
static Properties props = new Properties();
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
package eu.stratosphere.streaming.addons.kafka;
import java.util.HashMap;
import java.util.List;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
package eu.stratosphere.streaming.addons.kafka;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
......
......@@ -13,33 +13,28 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.performance;
package eu.stratosphere.streaming.addons.performance;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.examples.wordcount.WordCountCounter;
import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountPerformanceLocal {
public static class Mysink extends SinkFunction<Tuple2<String, Integer>> {
@Override
public void invoke(Tuple2<String, Integer> tuple) {
}
}
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment().setClusterSize(2);
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("/home/strato/stratosphere-distrib/resources/hamlet.txt", 4)
.flatMap(new WordCountPerformanceSplitter(), 2).partitionBy(0)
.map(new WordCountCounter(), 2).addSink(new Mysink(), 2);
.readTextStream("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountPerformanceSplitter(), 1)
.broadcast()
.map(new WordCountCounter(), 3);
dataStream.print();
env.executeCluster();
env.execute();
}
}
......@@ -13,10 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.performance;
import java.io.IOException;
import java.io.ObjectInputStream;
package eu.stratosphere.streaming.addons.performance;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -29,22 +26,27 @@ public class WordCountPerformanceSplitter extends FlatMapFunction<Tuple1<String>
private Tuple1<String> outTuple = new Tuple1<String>();
PerformanceCounter pCounter;
PerformanceCounter pCounter = new
PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/judit/strato/perf/broadcast4.csv");
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
pCounter.count();
}
}
private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
ois.defaultReadObject();
pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/resources/splitter.csv");
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
package eu.stratosphere.streaming.addons.rabbitmq;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-streaming</artifactId>
<version>0.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-core</artifactId>
<name>stratosphere-streaming-core</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -17,6 +17,7 @@ package eu.stratosphere.streaming.api;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction;
......@@ -28,9 +29,9 @@ import eu.stratosphere.types.TypeInformation;
public class DataStream<T extends Tuple> {
private static Integer counter = 0;
private final StreamExecutionEnvironment environment;
private TypeInformation<T> type;
private final Random random = new Random();
private String id;
List<String> connectIDs;
List<ConnectionType> ctypes;
......@@ -42,14 +43,13 @@ public class DataStream<T extends Tuple> {
*
* @param environment
*/
protected DataStream(StreamExecutionEnvironment environment, String operatorType) {
protected DataStream(StreamExecutionEnvironment environment) {
if (environment == null) {
throw new NullPointerException("context is null");
}
// TODO add name based on component number an preferable sequential id
counter++;
this.id = operatorType + "-" + counter.toString();
this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
this.environment = environment;
initConnections();
......@@ -61,8 +61,8 @@ public class DataStream<T extends Tuple> {
* @param environment
* @param id
*/
private DataStream(StreamExecutionEnvironment environment, String operatorType, String id) {
this.environment = environment;
private DataStream(StreamExecutionEnvironment environment, String id) {
this(environment);
this.id = id;
}
......@@ -87,7 +87,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream copy.
*/
public DataStream<T> copy() {
DataStream<T> copiedStream = new DataStream<T>(environment, "", getId());
DataStream<T> copiedStream = new DataStream<T>(environment, getId());
copiedStream.type = this.type;
copiedStream.connectIDs = new ArrayList<String>(this.connectIDs);
......@@ -127,7 +127,7 @@ public class DataStream<T extends Tuple> {
}
return returnStream;
}
/**
* Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a
......@@ -139,19 +139,19 @@ public class DataStream<T extends Tuple> {
*/
public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy();
for (DataStream<T> stream : streams) {
for(DataStream<T> stream:streams){
addConnection(returnStream, stream);
}
return returnStream;
}
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream){
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams);
returnStream.batchSizes.addAll(stream.batchSizes);
return returnStream;
}
......
......@@ -154,7 +154,7 @@ public class StreamExecutionEnvironment {
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable, int parallelism) {
DataStream<R> returnStream = new DataStream<R>(this, functionName);
DataStream<R> returnStream = new DataStream<R>(this);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), parallelism,
......@@ -178,7 +178,7 @@ public class StreamExecutionEnvironment {
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
DataStream<T> returnStream = new DataStream<T>(this);
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism,
......@@ -241,7 +241,7 @@ public class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this);
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1, 1);
......@@ -260,7 +260,7 @@ public class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this);
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1, 1);
......@@ -355,10 +355,6 @@ public class StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
......@@ -371,7 +367,7 @@ public class StreamExecutionEnvironment {
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
DataStream<T> returnStream = new DataStream<T>(this);
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism,
......@@ -394,10 +390,6 @@ public class StreamExecutionEnvironment {
return addSource(new FileSourceFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
......@@ -412,10 +404,6 @@ public class StreamExecutionEnvironment {
return addSource(new FileStreamFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
/**
* Converts object to byte array using default java serialization
*
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples;
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.DataInput;
import java.io.DataOutput;
......
......@@ -27,7 +27,7 @@ import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.api.streamcomponent.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册