提交 a2cabbb7 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] DataStream id update

上级 a31df653
...@@ -7,34 +7,9 @@ ...@@ -7,34 +7,9 @@
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<version>0.2-SNAPSHOT</version> <version>0.2-SNAPSHOT</version>
<artifactId>stratosphere-streaming</artifactId> <artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name> <name>stratosphere-streaming</name>
<packaging>pom</packaging>
<url>http://github.com/stratosphere/stratosphere-streaming</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>stratosphere</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/stratosphere/stratosphere-streaming</url>
<connection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</connection>
<developerConnection>scm:git:git@github.com:stratosphere/stratosphere-streaming.git</developerConnection>
</scm>
<developers> <packaging>jar</packaging>
</developers>
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
</modules>
<properties> <properties>
<stratosphere.version>0.5</stratosphere.version> <stratosphere.version>0.5</stratosphere.version>
...@@ -55,50 +30,42 @@ ...@@ -55,50 +30,42 @@
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-core</artifactId> <artifactId>stratosphere-core</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId> <artifactId>stratosphere-tests</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-compiler</artifactId> <artifactId>stratosphere-compiler</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId> <artifactId>stratosphere-runtime</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId> <artifactId>stratosphere-clients</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.stratosphere</groupId> <groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId> <artifactId>stratosphere-java</artifactId>
<version>${stratosphere.version}</version> <version>${stratosphere.version}</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.7</version> <version>4.7</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.3.2</version> <version>3.3.2</version>
<type>jar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
...@@ -111,14 +78,27 @@ ...@@ -111,14 +78,27 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>1.2.16</version> <version>1.2.16</version>
<type>jar</type> </dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<version>1.8.5</version> <version>1.8.5</version>
<scope>test</scope> <scope>test</scope>
<type>jar</type>
</dependency> </dependency>
</dependencies> </dependencies>
......
...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api; ...@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.api;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random;
import eu.stratosphere.api.java.functions.FilterFunction; import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.FlatMapFunction;
...@@ -29,9 +28,9 @@ import eu.stratosphere.types.TypeInformation; ...@@ -29,9 +28,9 @@ import eu.stratosphere.types.TypeInformation;
public class DataStream<T extends Tuple> { public class DataStream<T extends Tuple> {
private static Integer counter = 0;
private final StreamExecutionEnvironment environment; private final StreamExecutionEnvironment environment;
private TypeInformation<T> type; private TypeInformation<T> type;
private final Random random = new Random();
private String id; private String id;
List<String> connectIDs; List<String> connectIDs;
List<ConnectionType> ctypes; List<ConnectionType> ctypes;
...@@ -43,13 +42,14 @@ public class DataStream<T extends Tuple> { ...@@ -43,13 +42,14 @@ public class DataStream<T extends Tuple> {
* *
* @param environment * @param environment
*/ */
protected DataStream(StreamExecutionEnvironment environment) { protected DataStream(StreamExecutionEnvironment environment, String operatorType) {
if (environment == null) { if (environment == null) {
throw new NullPointerException("context is null"); throw new NullPointerException("context is null");
} }
// TODO add name based on component number an preferable sequential id // TODO add name based on component number an preferable sequential id
this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong()); counter++;
this.id = operatorType + "-" + counter.toString();
this.environment = environment; this.environment = environment;
initConnections(); initConnections();
...@@ -61,8 +61,8 @@ public class DataStream<T extends Tuple> { ...@@ -61,8 +61,8 @@ public class DataStream<T extends Tuple> {
* @param environment * @param environment
* @param id * @param id
*/ */
private DataStream(StreamExecutionEnvironment environment, String id) { private DataStream(StreamExecutionEnvironment environment, String operatorType, String id) {
this(environment); this.environment = environment;
this.id = id; this.id = id;
} }
...@@ -87,7 +87,7 @@ public class DataStream<T extends Tuple> { ...@@ -87,7 +87,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream copy. * @return The DataStream copy.
*/ */
public DataStream<T> copy() { public DataStream<T> copy() {
DataStream<T> copiedStream = new DataStream<T>(environment, getId()); DataStream<T> copiedStream = new DataStream<T>(environment, "", getId());
copiedStream.type = this.type; copiedStream.type = this.type;
copiedStream.connectIDs = new ArrayList<String>(this.connectIDs); copiedStream.connectIDs = new ArrayList<String>(this.connectIDs);
...@@ -127,7 +127,7 @@ public class DataStream<T extends Tuple> { ...@@ -127,7 +127,7 @@ public class DataStream<T extends Tuple> {
} }
return returnStream; return returnStream;
} }
/** /**
* Connecting DataStream outputs with each other. The streams connected * Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a * using this operator will be transformed simultaneously. It creates a
...@@ -139,19 +139,19 @@ public class DataStream<T extends Tuple> { ...@@ -139,19 +139,19 @@ public class DataStream<T extends Tuple> {
*/ */
public DataStream<T> connectWith(DataStream<T>... streams) { public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy(); DataStream<T> returnStream = copy();
for(DataStream<T> stream:streams){ for (DataStream<T> stream : streams) {
addConnection(returnStream, stream); addConnection(returnStream, stream);
} }
return returnStream; return returnStream;
} }
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream){ public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
returnStream.connectIDs.addAll(stream.connectIDs); returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes); returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams); returnStream.cparams.addAll(stream.cparams);
returnStream.batchSizes.addAll(stream.batchSizes); returnStream.batchSizes.addAll(stream.batchSizes);
return returnStream; return returnStream;
} }
......
...@@ -154,7 +154,7 @@ public class StreamExecutionEnvironment { ...@@ -154,7 +154,7 @@ public class StreamExecutionEnvironment {
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName, <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function, DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable, int parallelism) { UserTaskInvokable<T, R> functionInvokable, int parallelism) {
DataStream<R> returnStream = new DataStream<R>(this); DataStream<R> returnStream = new DataStream<R>(this, functionName);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName, jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), parallelism, serializeToByteArray(function), parallelism,
...@@ -178,7 +178,7 @@ public class StreamExecutionEnvironment { ...@@ -178,7 +178,7 @@ public class StreamExecutionEnvironment {
*/ */
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream, public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) { SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this); DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink", jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism, serializeToByteArray(sinkFunction), parallelism,
...@@ -241,7 +241,7 @@ public class StreamExecutionEnvironment { ...@@ -241,7 +241,7 @@ public class StreamExecutionEnvironment {
* @return The DataStream representing the elements. * @return The DataStream representing the elements.
*/ */
public <X> DataStream<Tuple1<X>> fromElements(X... data) { public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this); DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data), jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1, 1); "elements", serializeToByteArray(data[0]), 1, 1);
...@@ -260,7 +260,7 @@ public class StreamExecutionEnvironment { ...@@ -260,7 +260,7 @@ public class StreamExecutionEnvironment {
* @return The DataStream representing the elements. * @return The DataStream representing the elements.
*/ */
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) { public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this); DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data), jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1, 1); "elements", serializeToByteArray(data.toArray()[0]), 1, 1);
...@@ -355,6 +355,10 @@ public class StreamExecutionEnvironment { ...@@ -355,6 +355,10 @@ public class StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph()); ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
} }
public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
}
// TODO: Link to DataStream // TODO: Link to DataStream
/** /**
* Ads a data source thus opening a data stream. * Ads a data source thus opening a data stream.
...@@ -367,7 +371,7 @@ public class StreamExecutionEnvironment { ...@@ -367,7 +371,7 @@ public class StreamExecutionEnvironment {
*/ */
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction, public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) { int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this); DataStream<T> returnStream = new DataStream<T>(this, "source");
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source", jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism, serializeToByteArray(sourceFunction), parallelism,
...@@ -390,6 +394,10 @@ public class StreamExecutionEnvironment { ...@@ -390,6 +394,10 @@ public class StreamExecutionEnvironment {
return addSource(new FileSourceFunction(filePath), 1); return addSource(new FileSourceFunction(filePath), 1);
} }
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
/** /**
* Creates a DataStream that represents the Strings produced by reading the * Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with * given file line wise multiple times(infinite). The file will be read with
...@@ -404,6 +412,10 @@ public class StreamExecutionEnvironment { ...@@ -404,6 +412,10 @@ public class StreamExecutionEnvironment {
return addSource(new FileStreamFunction(filePath), 1); return addSource(new FileStreamFunction(filePath), 1);
} }
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
/** /**
* Converts object to byte array using default java serialization * Converts object to byte array using default java serialization
* *
......
...@@ -27,7 +27,7 @@ import eu.stratosphere.nephele.io.RecordWriter; ...@@ -27,7 +27,7 @@ import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamcomponent.DummyIS; import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent; package eu.stratosphere.streaming.examples;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册