diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index bfc12c32ea058ff7b1e5a84557d194dcc454eb25..3dd0f179b0f5fc6fc6128138e96602857ebe0f7a 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -87,7 +87,12 @@
org.apache.kafka
kafka_2.10
- 0.8.1.1
+ 0.8.0
+
+
+ org.jblas
+ jblas
+ 1.2.3
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java
index 91fe668ad2e2813a048aad19b138b33417fedf3d..731b7969d967035e9f70a3e3f4530b407011a706 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java
@@ -36,6 +36,7 @@ public class DataStream {
List connectIDs;
List ctypes;
List cparams;
+ List batchSizes;
protected DataStream() {
// TODO implement
@@ -63,16 +64,33 @@ public class DataStream {
ctypes.add(ConnectionType.SHUFFLE);
cparams = new ArrayList();
cparams.add(0);
+ batchSizes = new ArrayList();
+ batchSizes.add(1);
+
}
public String getId() {
return id;
}
+ public DataStream batch(int batchSize) {
+
+ if (batchSize < 1) {
+ throw new IllegalArgumentException("Batch size must be positive.");
+ }
+
+ for (int i = 0; i < batchSizes.size(); i++) {
+ batchSizes.set(i, batchSize);
+ }
+ context.setBatchSize(this);
+ return this;
+ }
+
public DataStream connectWith(DataStream stream) {
connectIDs.addAll(stream.connectIDs);
ctypes.addAll(stream.ctypes);
cparams.addAll(stream.cparams);
+ batchSizes.addAll(stream.batchSizes);
return this;
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
index fe9645c71aa3bdde018dbe3acfcf2ca2a3dd58bc..692efc2b72dc4764806463fdf72ffaca52cfce8c 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
@@ -65,6 +65,7 @@ public class JobGraphBuilder {
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
private int batchSize;
+ private long batchTimeout;
/**
* Creates a new JobGraph with the given name
@@ -85,7 +86,6 @@ public class JobGraphBuilder {
log.debug("JobGraph created");
}
this.faultToleranceType = faultToleranceType;
- batchSize = 1;
}
/**
@@ -99,9 +99,11 @@ public class JobGraphBuilder {
this(jobGraphName, FaultToleranceType.NONE);
}
- public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) {
+ public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
+ int defaultBatchSize, long defaultBatchTimeoutMillis) {
this(jobGraphName, faultToleranceType);
- this.batchSize = batchSize;
+ this.batchSize = defaultBatchSize;
+ this.batchTimeout = defaultBatchTimeoutMillis;
}
/**
@@ -247,7 +249,6 @@ public class JobGraphBuilder {
* @param component
* AbstractJobVertex associated with the component
*/
-
private Configuration setComponent(String componentName,
final Class extends StreamComponent> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
@@ -263,6 +264,7 @@ public class JobGraphBuilder {
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
+ config.setLong("batchTimeout", batchTimeout);
// config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
@@ -302,6 +304,11 @@ public class JobGraphBuilder {
return config;
}
+ public void setBatchSize(String componentName, int batchSize) {
+ Configuration config = components.get(componentName).getConfiguration();
+ config.setInteger("batchSize", batchSize);
+ }
+
/**
* Adds serialized invokable object to the JobVertex configuration
*
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java
old mode 100755
new mode 100644
index b8c98f233d07a957bedc277b7f0f1767d0eeb738..ff30915b7343443468969384eb55168b8680f020
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java
@@ -1,28 +1,28 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.streaming.api;
-
-import java.io.Serializable;
-
-import eu.stratosphere.api.java.tuple.Tuple;
-
-public abstract class SinkFunction implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public abstract void invoke(IN tuple);
-
-}
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.api;
+
+import java.io.Serializable;
+
+import eu.stratosphere.api.java.tuple.Tuple;
+
+public abstract class SinkFunction implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract void invoke(IN tuple);
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java
index 48be5d40e6594eae9c813b55630bde8640aaace5..03a24e7e9cc64a09a201fba88079ec5aafda867f 100755
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java
@@ -28,37 +28,54 @@ public class StreamCollector implements Collector {
protected StreamRecord streamRecord;
protected int batchSize;
+ protected long batchTimeout;
protected int counter = 0;
protected int channelID;
+ private long timeOfLastRecordEmitted = System.currentTimeMillis();;
private List> outputs;
- public StreamCollector(int batchSize, int channelID,
+ public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate serializationDelegate,
List> outputs) {
this.batchSize = batchSize;
+ this.batchTimeout = batchTimeout;
this.streamRecord = new ArrayStreamRecord(batchSize);
this.streamRecord.setSeralizationDelegate(serializationDelegate);
this.channelID = channelID;
this.outputs = outputs;
}
- public StreamCollector(int batchSize, int channelID,
+ public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate serializationDelegate) {
- this(batchSize, channelID, serializationDelegate, null);
+ this(batchSize, batchTimeout, channelID, serializationDelegate, null);
}
+ // TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
+
if (counter >= batchSize) {
- counter = 0;
- streamRecord.setId(channelID);
emit(streamRecord);
+ // timeOfLastRecordEmitted = System.currentTimeMillis();
+ } else {
+ // timeout();
+ }
+ }
+
+ public void timeout() {
+ if (timeOfLastRecordEmitted + batchTimeout < System.currentTimeMillis()) {
+ StreamRecord truncatedRecord = new ArrayStreamRecord(streamRecord, counter);
+ emit(truncatedRecord);
+ timeOfLastRecordEmitted = System.currentTimeMillis();
}
}
private void emit(StreamRecord streamRecord) {
+ counter = 0;
+ streamRecord.setId(channelID);
+
if (outputs == null) {
System.out.println(streamRecord);
} else {
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java
index 197d66fa1eac64e61735b7e3590c469329ea907b..9b64232e6ce3742282e8f9ad21f1530468920431 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java
@@ -32,16 +32,20 @@ import eu.stratosphere.util.Collector;
//TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
-
- public StreamExecutionEnvironment(int batchSize) {
- if (batchSize < 1) {
+
+ public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) {
+ if (defaultBatchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
- jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, batchSize);
+ if (defaultBatchTimeoutMillis < 1) {
+ throw new IllegalArgumentException("Batch timeout must be positive.");
+ }
+ jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE,
+ defaultBatchSize, defaultBatchTimeoutMillis);
}
public StreamExecutionEnvironment() {
- this(1);
+ this(1, 1000);
}
private static class DummySource extends UserSourceInvokable> {
@@ -54,11 +58,19 @@ public class StreamExecutionEnvironment {
}
}
}
-
+
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
+ public void setBatchSize(DataStream inputStream) {
+
+ for (int i = 0; i < inputStream.connectIDs.size(); i++) {
+ jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i),
+ inputStream.batchSizes.get(i));
+ }
+ }
+
private void connectGraph(DataStream inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
@@ -80,8 +92,10 @@ public class StreamExecutionEnvironment {
}
}
-
- public DataStream addFunction(String functionName, DataStream inputStream, final AbstractFunction function, UserTaskInvokable functionInvokable) {
+
+ public DataStream addFunction(String functionName,
+ DataStream inputStream, final AbstractFunction function,
+ UserTaskInvokable functionInvokable) {
DataStream returnStream = new DataStream(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -93,14 +107,13 @@ public class StreamExecutionEnvironment {
e.printStackTrace();
}
- jobGraphBuilder.setTask(returnStream.getId(), functionInvokable,
- functionName, baos.toByteArray());
+ jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
+ baos.toByteArray());
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
-
public DataStream addSink(DataStream inputStream,
SinkFunction sinkFunction) {
@@ -164,8 +177,8 @@ public class StreamExecutionEnvironment {
public DataStream> readTextFile(String path) {
return addSource(new FileSourceFunction(path));
- }
-
+ }
+
public DataStream> addDummySource() {
DataStream> returnStream = new DataStream>(this);
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
index dee5a8b5d3e88f6205562c41bd6101a96a356f9d..e413b50bf4784b689ddc4cf9d93348053ab21571 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
@@ -107,8 +107,11 @@ public final class StreamComponentHelper {
public StreamCollector setCollector(Configuration taskConfiguration, int id,
List> outputs) {
- int batchSize = taskConfiguration.getInteger("batchSize", -1);
- collector = new StreamCollector(batchSize, id, outSerializationDelegate, outputs);
+
+ int batchSize = taskConfiguration.getInteger("batchSize", 1);
+ long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
+ collector = new StreamCollector(batchSize, batchTimeout, id,
+ outSerializationDelegate, outputs);
return collector;
}
@@ -121,7 +124,7 @@ public final class StreamComponentHelper {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
Object function = in.readObject();
-
+
if (operatorName.equals("flatMap")) {
setSerializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
@@ -151,21 +154,21 @@ public final class StreamComponentHelper {
}
}
-
+
private void setSerializer(Object function, Class extends AbstractFunction> clazz) {
- inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
- function.getClass(), 0, null, null);
+ inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+ 0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer);
- outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
- function.getClass(), 1, null, null);
+ outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+ 1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate(outTupleSerializer);
}
-
+
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java
index e2765936131c42fe75da0c9e17ed6bf9a73157af..f541c98995ceb86e84b187a762e1476f88dce7f4 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java
@@ -15,20 +15,21 @@
package eu.stratosphere.streaming.api.streamcomponent;
-import eu.stratosphere.streaming.api.StreamCollector;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import java.util.ArrayList;
+
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
+import eu.stratosphere.util.Collector;
-public class StreamWindowTask extends UserTaskInvokable {
+public class StreamWindowTask extends FlatMapFunction {
private static final long serialVersionUID = 1L;
-
+
private int computeGranularity;
private int windowFieldId = 1;
- private StreamRecord tempRecord;
+ private ArrayList tempArrayList;
private SlidingWindowState window;
private MutableTableState sum;
private long initTimestamp = -1;
@@ -44,43 +45,42 @@ public class StreamWindowTask extends UserTaskInvokable {
sum.put("sum", 0);
}
- private void incrementCompute(StreamRecord record){}
- private void decrementCompute(StreamRecord record){}
- private void produceRecord(long progress){}
+ private void incrementCompute(ArrayList tupleArray) {}
+
+ private void decrementCompute(ArrayList tupleArray) {}
+
+ private void produceOutput(long progress, Collector out) {}
@Override
- public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
- int numTuple = record.getBatchSize();
- int tupleIndex = 0;
- for (int i = 0; i < numTuple; ++i) {
- long progress = record.getTuple(i).getField(windowFieldId);
- if (initTimestamp == -1) {
- initTimestamp = progress;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new ArrayStreamRecord(record.getBatchSize());
- } else {
- if (progress > nextTimestamp) {
+ public void flatMap(Tuple value, Collector out) throws Exception {
+ // TODO Auto-generated method stub
+ long progress = value.getField(windowFieldId);
+ if (initTimestamp == -1) {
+ initTimestamp = progress;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempArrayList = new ArrayList();
+ } else {
+ if (progress > nextTimestamp) {
+ if (window.isFull()) {
+ ArrayList expiredArrayList = window.popFront();
+ incrementCompute(tempArrayList);
+ decrementCompute(expiredArrayList);
+ window.pushBack(tempArrayList);
+ if (window.isEmittable()) {
+ produceOutput(progress, out);
+ }
+ } else {
+ incrementCompute(tempArrayList);
+ window.pushBack(tempArrayList);
if (window.isFull()) {
- StreamRecord expiredRecord = window.popFront();
- incrementCompute(tempRecord);
- decrementCompute(expiredRecord);
- window.pushBack(tempRecord);
- if (window.isEmittable()) {
- produceRecord(progress);
- }
- } else {
- incrementCompute(tempRecord);
- window.pushBack(tempRecord);
- if (window.isFull()) {
- produceRecord(progress);
- }
+ produceOutput(progress, out);
}
- initTimestamp = nextTimestamp;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new ArrayStreamRecord(record.getBatchSize());
}
- tempRecord.setTuple(tupleIndex++, record.getTuple(i));
+ initTimestamp = nextTimestamp;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempArrayList = new ArrayList();
}
- }
+ tempArrayList.add(value);
+ }
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java
index e8c53bb80146ddcb1d219f387f1742df8b63d3f5..3750a4b101090f42889303974f36bf36d833294a 100755
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java
@@ -45,14 +45,19 @@ public class ArrayStreamRecord extends StreamRecord {
}
public ArrayStreamRecord(StreamRecord record) {
- tupleBatch = new Tuple[record.getBatchSize()];
+ this(record, record.getBatchSize());
+ }
+
+ public ArrayStreamRecord(StreamRecord record, int truncatedSize) {
+ tupleBatch = new Tuple[truncatedSize];
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
- for (int i = 0; i < record.getBatchSize(); ++i) {
+ for (int i = 0; i < truncatedSize; ++i) {
this.tupleBatch[i] = copyTuple(record.getTuple(i));
}
this.batchSize = tupleBatch.length;
}
+
/**
* Creates a new batch of records containing the given Tuple array as
* elements
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java
deleted file mode 100644
index b33cc0e637b3b8f0fcd50f34c76b55623ec97834..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.streaming.examples.iterative;
-
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Level;
-
-import eu.stratosphere.client.minicluster.NepheleMiniCluster;
-import eu.stratosphere.client.program.Client;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.streaming.api.JobGraphBuilder;
-import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
-import eu.stratosphere.streaming.util.LogUtils;
-
-public class IterativeLocal {
-
- public static JobGraph getJobGraph() {
- JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
- graphBuilder.setSource("IterativeSource", IterativeSource.class);
- graphBuilder.setTask("IterativeParallel", IterativeParallel.class, 1, 1);
- graphBuilder.setTask("IterativeStateHolder", IterativeStateHolder.class);
- graphBuilder.setSink("IterativeSink", IterativeSink.class);
-
- graphBuilder.fieldsConnect("IterativeSource", "IterativeParallel", 1);
- graphBuilder.fieldsConnect("IterativeParallel", "IterativeStateHolder", 1);
- graphBuilder.globalConnect("IterativeStateHolder", "IterativeSink");
-
- return graphBuilder.getJobGraph();
- }
-
- public static void main(String[] args) {
-
- LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
-
- try {
-
- JobGraph jG = getJobGraph();
- Configuration configuration = jG.getJobConfiguration();
-
- if (args.length == 0) {
- args = new String[] { "local" };
- }
-
- if (args[0].equals("local")) {
- System.out.println("Running in Local mode");
- NepheleMiniCluster exec = new NepheleMiniCluster();
-
- exec.start();
-
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
-
- client.run(jG, true);
-
- exec.stop();
-
- } else if (args[0].equals("cluster")) {
- System.out.println("Running in Cluster2 mode");
-
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
- configuration);
-
- client.run(jG, true);
-
- }
-
- } catch (Exception e) {
- System.out.println(e);
- }
-
- }
-}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java
index 9ddca15c6455c420a6d8bc20fa1ad2901f96f1a5..4d5c2cb60862f8703a80bdec90531824422dc77b 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java
@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
+import org.apache.log4j.Level;
+
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.streaming.util.ClusterUtil;
+import eu.stratosphere.streaming.util.LogUtils;
+
public class CollaborativeFilteringLocal {
+
+ public static JobGraph getJobGraph() {
+ JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
+ graphBuilder.setSource("Source", new CollaborativeFilteringSource());
+ graphBuilder.setTask("Task", new CollaborativeFilteringTask(), 1, 1);
+ graphBuilder.setSink("Sink", new CollaborativeFilteringSink());
- public static void main(String[] args) {
- // TODO Auto-generated method stub
+ graphBuilder.fieldsConnect("Source", "Task", 0);
+ graphBuilder.shuffleConnect("Task", "Sink");
+ return graphBuilder.getJobGraph();
}
+ public static void main(String[] args) {
+
+ LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+ ClusterUtil.runOnMiniCluster(getJobGraph());
+
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..b450a941f5c0a4f8daa3d7eb94649503ccea98a1
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSink.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
+
+import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class CollaborativeFilteringSink extends UserSinkInvokable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
+ System.out.println("received record...");
+ int tupleNum = record.getNumOfTuples();
+ System.out.println("============================================");
+ for (int i = 0; i < tupleNum; ++i) {
+ System.out.println("name=" + record.getField(i, 0) + ", grade="
+ + record.getField(i, 1) + ", salary="
+ + record.getField(i, 2));
+ }
+ System.out.println("============================================");
+ }
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..2003145ff81b0dc95c8235ade84f092adc1b60ba
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSource.java
@@ -0,0 +1,54 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class CollaborativeFilteringSource extends UserSourceInvokable {
+ private static final long serialVersionUID = 1L;
+
+ private BufferedReader br = null;
+ private String line = new String();
+ private StreamRecord outRecord = new StreamRecord(new Tuple3());
+
+ @Override
+ public void invoke() throws Exception {
+ // TODO Auto-generated method stub
+ br = new BufferedReader(new FileReader(
+ "src/test/resources/testdata/MovieLens100k.data"));
+ while (true) {
+ line = br.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line != "") {
+ String[] items=line.split("\t");
+ outRecord.setInteger(0, Integer.valueOf(items[0]));
+ outRecord.setInteger(1, Integer.valueOf(items[1]));
+ outRecord.setInteger(2, Integer.valueOf(items[2]));
+ emit(outRecord);
+ performanceCounter.count();
+ }
+ line = br.readLine();
+ }
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..aad19ebd3d824d28a43318dd35572dc418f0de0e
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringTask.java
@@ -0,0 +1,52 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
+
+
+import java.util.HashMap;
+
+import org.jblas.DoubleMatrix;
+
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class CollaborativeFilteringTask extends UserTaskInvokable {
+
+ private static final long serialVersionUID = 1L;
+ private StreamRecord outRecord = new StreamRecord(new Tuple1());
+ HashMap rowIndex=new HashMap();
+ HashMap columnIndex=new HashMap();
+ DoubleMatrix userItem=new DoubleMatrix(1000, 2000);
+ DoubleMatrix coOccurence=new DoubleMatrix(2000, 2000);
+ @Override
+ public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
+ int userId = record.getInteger(0, 0);
+ int itemId = record.getInteger(0, 1);
+ int rating = record.getInteger(0, 2);
+ if(!rowIndex.containsKey(userId)){
+ rowIndex.put(userId, rowIndex.size());
+ }
+ if(!columnIndex.containsKey(itemId)){
+ columnIndex.put(itemId, columnIndex.size());
+ }
+ userItem.put(rowIndex.get(userId), columnIndex.get(itemId), rating);
+
+ //outRecord.setString(0, line);
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java
deleted file mode 100644
index 2286ffff642eb01e0e56762d70d4093b086a41df..0000000000000000000000000000000000000000
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.streaming.examples.iterative.kmeans;
-
-public class KMeansLocal {
-
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSink.java
similarity index 69%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSink.java
index 4200bffdc361b0c9a141c6b8c3a8f01f06c8a527..47ca1e74a34bee61bd9f29e090b5494da271511d 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSink.java
@@ -13,22 +13,20 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.examples.iterative;
+package eu.stratosphere.streaming.examples.iterative.kmeans;
-
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-public class IterativeStateHolder extends UserTaskInvokable {
-
- private static final long serialVersionUID = -3042489460184024483L;
-
- public IterativeStateHolder() {
- }
+public class KMeansSink extends UserSinkInvokable {
+ private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
-
+ //int tupleNum = record.getNumOfTuples();
+ System.out.println("============================================");
+ System.out.println("record=" + record.getString(0, 0));
+ System.out.println("============================================");
}
-}
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..8b34d71ad609286bb1f5f69789d2edcba7145f13
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSource.java
@@ -0,0 +1,84 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.kmeans;
+
+import java.util.Random;
+
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class KMeansSource extends UserSourceInvokable {
+ private static final long serialVersionUID = 1L;
+ private static final long DEFAULT_SEED = 4650285087650871364L;
+ private Random random = new Random(DEFAULT_SEED);
+ private StreamRecord outRecord = new StreamRecord(new Tuple1());
+ private int numCenter;
+ private int dimension;
+ private double absoluteStdDev;
+ private double range;
+ private StringBuilder buffer = new StringBuilder();
+
+ public KMeansSource(int numCenter, int dimension, double stddev, double range){
+ this.numCenter=numCenter;
+ this.dimension=dimension;
+ this.absoluteStdDev = stddev * range;
+ this.range=range;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ // TODO Auto-generated method stub
+ double[][] means = uniformRandomCenters(random, numCenter, dimension, range);
+ double[] point = new double[dimension];
+ int nextCentroid = 0;
+
+ while (true) {
+ // generate a point for the current centroid
+ double[] centroid = means[nextCentroid];
+ for (int d = 0; d < dimension; d++) {
+ point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
+ }
+ nextCentroid = (nextCentroid + 1) % numCenter;
+ String pointString=generatePointString(point);
+ outRecord.setString(0, pointString);
+ emit(outRecord);
+ }
+ }
+
+ private double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
+ final double halfRange = range / 2;
+ final double[][] points = new double[num][dimensionality];
+ for (int i = 0; i < num; i++) {
+ for (int dim = 0; dim < dimensionality; dim ++) {
+ points[i][dim] = (rnd.nextDouble() * range) - halfRange;
+ }
+ }
+ return points;
+ }
+
+ private String generatePointString(double[] point){
+ buffer.setLength(0);
+ for (int j = 0; j < dimension; j++) {
+ buffer.append(point[j]);
+ if(j < dimension - 1) {
+ buffer.append(" ");
+ }
+ }
+ return buffer.toString();
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..5246c427fce8bcd4cbfeb0e570498be6881ea772
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansTask.java
@@ -0,0 +1,42 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.kmeans;
+
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class KMeansTask extends UserTaskInvokable {
+
+ private static final long serialVersionUID = 1L;
+ private StreamRecord outRecord = new StreamRecord(new Tuple1());
+ private double[] point=null;
+ public KMeansTask(int dimension){
+ point = new double[dimension];
+ }
+
+ @Override
+ public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
+ String[] pointStr = record.getString(0, 0).split(" ");
+ for(int i=0; i> _vertices = null;
+ public Graph() {
+ _vertices = new HashMap>();
}
+ public void insertDirectedEdge(int sourceNode, int targetNode) {
+ if (!_vertices.containsKey(sourceNode)) {
+ _vertices.put(sourceNode, new HashSet());
+ }
+ _vertices.get(sourceNode).add(targetNode);
+ }
+
+ public void insertUndirectedEdge(int sourceNode, int targetNode){
+ if(!_vertices.containsKey(sourceNode)){
+ _vertices.put(sourceNode, new HashSet());
+ }
+ if(!_vertices.containsKey(targetNode)){
+ _vertices.put(targetNode, new HashSet());
+ }
+ _vertices.get(sourceNode).add(targetNode);
+ _vertices.get(targetNode).add(sourceNode);
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankLocal.java
new file mode 100644
index 0000000000000000000000000000000000000000..7865980d1c172974ade41c9e73d0217f3c6a7943
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankLocal.java
@@ -0,0 +1,46 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.pagerank;
+
+import org.apache.log4j.Level;
+
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.streaming.util.ClusterUtil;
+import eu.stratosphere.streaming.util.LogUtils;
+
+public class PageRankLocal {
+
+ public static JobGraph getJobGraph() {
+ JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
+ graphBuilder.setSource("Source", new PageRankSource());
+ graphBuilder.setTask("Task", new PageRankTask(), 1, 1);
+ graphBuilder.setSink("Sink", new PageRankSink());
+
+ graphBuilder.fieldsConnect("Source", "Task", 0);
+ graphBuilder.shuffleConnect("Task", "Sink");
+
+ return graphBuilder.getJobGraph();
+ }
+
+ public static void main(String[] args) {
+
+ LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+ ClusterUtil.runOnMiniCluster(getJobGraph());
+
+ }
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSink.java
similarity index 87%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSink.java
index 6ea080b62621b374543361e88104c04a232bb505..8fcf3f5e896f1a25886e48a27398d039dbf63ef3 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSink.java
@@ -13,17 +13,17 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.examples.iterative;
+package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-public class IterativeSink extends UserSinkInvokable {
-
- private static final long serialVersionUID = -1989637817643875304L;
+public class PageRankSink extends UserSinkInvokable {
+ private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
@@ -32,6 +32,6 @@ public class IterativeSink extends UserSinkInvokable {
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
- System.out.println("============================================");
+ System.out.println("============================================");
}
-}
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..d1407db6f8c6392031fadfd5e0ab0c9ae3bd52ea
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSource.java
@@ -0,0 +1,52 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.pagerank;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class PageRankSource extends UserSourceInvokable {
+ private static final long serialVersionUID = 1L;
+
+ private BufferedReader br = null;
+ private StreamRecord outRecord = new StreamRecord(new Tuple2());
+
+ @Override
+ public void invoke() throws Exception {
+ // TODO Auto-generated method stub
+ br = new BufferedReader(new FileReader(
+ "src/test/resources/testdata/ASTopology.data"));
+ while (true) {
+ String line = br.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line != "") {
+ String[] link=line.split(":");
+ outRecord.setInteger(0, Integer.valueOf(link[0]));
+ outRecord.setInteger(0, Integer.valueOf(link[1]));
+ emit(outRecord);
+ performanceCounter.count();
+ }
+ line = br.readLine();
+ }
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..c2e02909d2559347e6ff638704b177888cfd888b
--- /dev/null
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankTask.java
@@ -0,0 +1,39 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.streaming.examples.iterative.pagerank;
+
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+
+public class PageRankTask extends UserTaskInvokable {
+
+ private static final long serialVersionUID = 1L;
+ private StreamRecord outRecord = new StreamRecord(new Tuple1());
+ private Graph linkGraph = new Graph();
+
+ @Override
+ public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
+ Integer sourceNode = record.getInteger(0, 0);
+ Integer targetNode = record.getInteger(0, 1);
+ // set the input graph.
+ linkGraph.insertDirectedEdge(sourceNode, targetNode);
+
+ //outRecord.setString(0, line);
+ }
+
+}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/Graph.java
similarity index 52%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/Graph.java
index 0d97061a34c5b96f7e183fa22af1707363dd66d1..d5c93bf82248048f40b095ce7176502fd51db877 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/Graph.java
@@ -13,34 +13,35 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.state;
+package eu.stratosphere.streaming.examples.iterative.sssp;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.index.IndexPair;
+public class Graph {
+ public Map> _vertices = null;
-public class LogTableStateIterator implements TableStateIterator{
-
- private Iterator> iterator;
- private HashMap> blockList;
- public LogTableStateIterator(Iterator> iter, HashMap> blocks){
- iterator=iter;
- blockList=blocks;
- }
- @Override
- public boolean hasNext() {
- // TODO Auto-generated method stub
- return false;
+ public Graph() {
+ _vertices = new HashMap>();
}
- @Override
- public Tuple2 next() {
- // TODO Auto-generated method stub
- return null;
+ public void insertDirectedEdge(int sourceNode, int targetNode) {
+ if (!_vertices.containsKey(sourceNode)) {
+ _vertices.put(sourceNode, new HashSet());
+ }
+ _vertices.get(sourceNode).add(targetNode);
}
+ public void insertUndirectedEdge(int sourceNode, int targetNode){
+ if(!_vertices.containsKey(sourceNode)){
+ _vertices.put(sourceNode, new HashSet());
+ }
+ if(!_vertices.containsKey(targetNode)){
+ _vertices.put(targetNode, new HashSet());
+ }
+ _vertices.get(sourceNode).add(targetNode);
+ _vertices.get(targetNode).add(sourceNode);
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java
index 6c06a467e11ab02b4c35f785b708be3bdc2de91e..f6b32c4c0af56727f8dda02c604138d2920e9f3a 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java
@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.sssp;
+import org.apache.log4j.Level;
+
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.streaming.util.ClusterUtil;
+import eu.stratosphere.streaming.util.LogUtils;
+
public class SSSPLocal {
+
+ public static JobGraph getJobGraph() {
+ JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
+ graphBuilder.setSource("Source", new SSSPSource());
+ graphBuilder.setTask("Task", new SSSPTask(), 1, 1);
+ graphBuilder.setSink("Sink", new SSSPSink());
- public static void main(String[] args) {
- // TODO Auto-generated method stub
+ graphBuilder.fieldsConnect("Source", "Task", 0);
+ graphBuilder.shuffleConnect("Task", "Sink");
+ return graphBuilder.getJobGraph();
}
+ public static void main(String[] args) {
+
+ LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+ ClusterUtil.runOnMiniCluster(getJobGraph());
+
+ }
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSink.java
similarity index 90%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSink.java
index 497d13e607df03503a0e849e093f79845debb67a..b55e634c0263f967344e1aabf2596cb217c286d2 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSink.java
@@ -13,16 +13,17 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.examples.window.join;
+package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-public class WindowJoinSink extends UserSinkInvokable {
+public class SSSPSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
+ // TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
@@ -31,6 +32,6 @@ public class WindowJoinSink extends UserSinkInvokable {
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
- System.out.println("============================================");
+ System.out.println("============================================");
}
-}
+}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSource.java
similarity index 53%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSource.java
index ace51bec8b9ca390476a1523c03b6a15ec66145d..fe7a4bc8539a884fa6d855f8c12a1772688594d0 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSource.java
@@ -13,18 +13,40 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.examples.iterative;
+package eu.stratosphere.streaming.examples.iterative.sssp;
-import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
-
-public class IterativeSource extends UserSourceInvokable {
+import java.io.BufferedReader;
+import java.io.FileReader;
- private static final long serialVersionUID = 8983174839600079890L;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+public class SSSPSource extends UserSourceInvokable {
+ private static final long serialVersionUID = 1L;
+
+ private BufferedReader br = null;
+ private StreamRecord outRecord = new StreamRecord(new Tuple2());
+
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
-
+ br = new BufferedReader(new FileReader(
+ "src/test/resources/testdata/ASTopology.data"));
+ while (true) {
+ String line = br.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line != "") {
+ String[] link=line.split(":");
+ outRecord.setInteger(0, Integer.valueOf(link[0]));
+ outRecord.setInteger(0, Integer.valueOf(link[1]));
+ emit(outRecord);
+ performanceCounter.count();
+ }
+ line = br.readLine();
+ }
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPTask.java
similarity index 67%
rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java
rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPTask.java
index f6df59887fdc6028bf6cb923e09dd77ca12f2387..a26cb246e27b1e7df37519c35905d8088b2bcc5c 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPTask.java
@@ -13,22 +13,27 @@
*
**********************************************************************************************************************/
-package eu.stratosphere.streaming.examples.iterative;
-
+package eu.stratosphere.streaming.examples.iterative.sssp;
+import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-public class IterativeParallel extends UserTaskInvokable {
-
- private static final long serialVersionUID = -3042489460184024483L;
-
- public IterativeParallel() {
- }
+public class SSSPTask extends UserTaskInvokable {
+ private static final long serialVersionUID = 1L;
+ private StreamRecord outRecord = new StreamRecord(new Tuple1());
+ private Graph linkGraph = new Graph();
+
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
-
+ Integer sourceNode = record.getInteger(0, 0);
+ Integer targetNode = record.getInteger(0, 1);
+ // set the input graph.
+ linkGraph.insertDirectedEdge(sourceNode, targetNode);
+
+ //outRecord.setString(0, line);
}
+
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java
index 71ef778133b5d72705e1f4acb5e030fc7af4bbd8..3bdd23bb4046eb9f6488a2f218111def39f1a3eb 100755
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -14,134 +14,188 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
-import eu.stratosphere.api.java.functions.MapFunction;
+import java.net.InetSocketAddress;
+
+import org.apache.log4j.Level;
+
+import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.streaming.api.DataStream;
-import eu.stratosphere.streaming.api.SinkFunction;
-import eu.stratosphere.streaming.api.SourceFunction;
-import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
-import eu.stratosphere.util.Collector;
+import eu.stratosphere.client.minicluster.NepheleMiniCluster;
+import eu.stratosphere.client.program.Client;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
// Source for feeding new data for prediction
- public static class NewDataSource extends SourceFunction> {
+ public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
+ StreamRecord record = new StreamRecord(new Tuple1(1));
+
@Override
- public void invoke(Collector> collector) throws Exception {
+ public void invoke() throws Exception {
+
while (true) {
- collector.collect(getNewData());
+ record.setTuple(getNewData());
+ emit(record);
}
+
}
// Method for pulling new data for prediction
- private Tuple1 getNewData() throws InterruptedException {
+ private Tuple getNewData() throws InterruptedException {
return new Tuple1(1);
}
}
// Source for feeding new training data for partial model building
- public static class TrainingDataSource extends SourceFunction> {
+ public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// Number of tuples grouped for building partial model
- // TODO: batch training data
private final int BATCH_SIZE = 1000;
+ StreamRecord record = new StreamRecord(1, BATCH_SIZE);
+
@Override
- public void invoke(Collector> collector) throws Exception {
+ public void invoke() throws Exception {
+
+ record.initRecords();
while (true) {
// Group the predefined number of records in a streamrecord then
// emit for model building
- collector.collect(getTrainingData());;
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ record.setTuple(i, getTrainingData());
+ }
+ emit(record);
}
}
// Method for pulling new training data
- private Tuple1 getTrainingData() throws InterruptedException {
+ private Tuple getTrainingData() throws InterruptedException {
return new Tuple1(1);
}
}
// Task for building up-to-date partial models on new training data
- public static class PartialModelBuilder extends MapFunction, Tuple1> {
+ public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
- public Tuple1 map(Tuple1 inTuple) throws Exception {
- return buildPartialModel(inTuple);
+ public void invoke(StreamRecord record) throws Exception {
+ emit(buildPartialModel(record));
}
// Method for building partial model on the grouped training data
- protected Tuple1 buildPartialModel(Tuple1 inTuple) {
- return new Tuple1(1);
+ protected StreamRecord buildPartialModel(StreamRecord record) {
+ return new StreamRecord(new Tuple1(1));
}
}
// Task for performing prediction using the model produced in
// batch-processing and the up-to-date partial model
- public static class Predictor extends MapFunction, Tuple1> {
+ public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
- Tuple1 batchModel = null;
- Tuple1 partialModel = null;
+ StreamRecord batchModel = null;
+ StreamRecord partialModel = null;
@Override
- public Tuple1 map(Tuple1 inTuple) throws Exception {
- if (isModel(inTuple)) {
- partialModel = inTuple;
+ public void invoke(StreamRecord record) throws Exception {
+ if (isModel(record)) {
+ partialModel = record;
batchModel = getBatchModel();
- return null; //TODO: fix
} else {
- return predict(inTuple);
+ emit(predict(record));
}
}
// Pulls model built with batch-job on the old training data
- protected Tuple1 getBatchModel() {
- return new Tuple1(1);
+ protected StreamRecord getBatchModel() {
+ return new StreamRecord(new Tuple1(1));
}
// Checks whether the record is a model or a new data
- protected boolean isModel(Tuple1 inTuple) {
+ protected boolean isModel(StreamRecord record) {
return true;
}
// Performs prediction using the two models
- protected Tuple1 predict(Tuple1 inTuple) {
- return new Tuple1(0);
+ protected StreamRecord predict(StreamRecord record) {
+ return new StreamRecord(new Tuple1(0));
}
}
- public static class IMLSink extends SinkFunction> {
+ public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple1 inTuple) {
+ public void invoke(StreamRecord record) throws Exception {
// do nothing
}
}
+ private static JobGraph getJobGraph() throws Exception {
+ JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
+
+ graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
+ graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
+ graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1);
+ graphBuilder.setTask("Predictor", Predictor.class, 1, 1);
+ graphBuilder.setSink("Sink", Sink.class, 1, 1);
+
+ graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
+ graphBuilder.shuffleConnect("NewData", "Predictor");
+ graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
+ graphBuilder.shuffleConnect("Predictor", "Sink");
+
+ return graphBuilder.getJobGraph();
+ }
+
public static void main(String[] args) {
- StreamExecutionEnvironment env = new StreamExecutionEnvironment();
+ // set logging parameters for local run
+ LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
- DataStream> model =
- env.addSource(new TrainingDataSource())
- .map(new PartialModelBuilder())
- .broadcast();
-
- DataStream> prediction =
- env.addSource(new NewDataSource())
- .connectWith(model)
- .map(new Predictor())
- .addSink(new IMLSink());
+ try {
+
+ // generate JobGraph
+ JobGraph jG = getJobGraph();
+ Configuration configuration = jG.getJobConfiguration();
+
+ if (args.length == 0 || args[0].equals("local")) {
+ System.out.println("Running in Local mode");
+ // start local cluster and submit JobGraph
+ NepheleMiniCluster exec = new NepheleMiniCluster();
+ exec.start();
+
+ Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
+
+ client.run(jG, true);
+
+ exec.stop();
+ } else if (args[0].equals("cluster")) {
+ System.out.println("Running in Cluster mode");
+ // submit JobGraph to the running cluster
+ Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
+ client.run(jG, true);
+ }
+
+ } catch (Exception e) {
+ System.out.println(e);
+ }
}
}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
index d84ad9f503a9dcae8eacfe10b35a998a838feb7c..db493fe720573d898740c58622ee5caade4ac42c 100755
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
@@ -18,57 +18,73 @@ import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
+import org.apache.log4j.Level;
-import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.api.DataStream;
-import eu.stratosphere.streaming.api.SinkFunction;
-import eu.stratosphere.streaming.api.SourceFunction;
-import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
-import eu.stratosphere.util.Collector;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.streaming.api.JobGraphBuilder;
+import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
+import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
+import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
+import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.streaming.util.ClusterUtil;
+import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
- public static class NewDataSource extends SourceFunction> {
+ public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
+
+ StreamRecord record = new StreamRecord(2, 1);
+
Random rnd = new Random();
@Override
- public void invoke(Collector> collector) throws Exception {
+ public void invoke() throws Exception {
+ record.initRecords();
while (true) {
// pull new record from data source
- collector.collect(getNewData());
+ record.setTuple(getNewData());
+ emit(record);
}
}
- private Tuple2 getNewData() throws InterruptedException {
+ private Tuple getNewData() throws InterruptedException {
return new Tuple2(false, new Double[] { rnd.nextDouble() * 3,
rnd.nextDouble() * 5 });
}
}
- public static class TrainingDataSource extends SourceFunction> {
+ public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
- // TODO: batch training data
private final int BATCH_SIZE = 1000;
+
+ StreamRecord record = new StreamRecord(2, BATCH_SIZE);
+
Random rnd = new Random();
@Override
- public void invoke(Collector> collector) throws Exception {
+ public void invoke() throws Exception {
+
+ record.initRecords();
while (true) {
- collector.collect(getTrainingData());
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ record.setTuple(i, getTrainingData());
+ }
+ emit(record);
}
}
- private Tuple2 getTrainingData() throws InterruptedException {
+ private Tuple getTrainingData() throws InterruptedException {
return new Tuple2(rnd.nextDouble() * 10, new Double[] {
rnd.nextDouble() * 3, rnd.nextDouble() * 5 });
@@ -76,29 +92,25 @@ public class IncrementalOLS {
}
}
- public static class PartialModelBuilder extends
- MapFunction, Tuple2> {
+ public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
- public Tuple2 map(Tuple2 inTuple) throws Exception {
- return buildPartialModel(inTuple);
+ public void invoke(StreamRecord record) throws Exception {
+ emit(buildPartialModel(record));
}
- // TODO: deal with batchsize
- protected Tuple2 buildPartialModel(Tuple2 inTuple) {
+ protected StreamRecord buildPartialModel(StreamRecord record) {
- // Integer numOfTuples = record.getNumOfTuples();
- Integer numOfTuples = 1;
- Integer numOfFeatures = ((Double[]) inTuple.getField(1)).length;
+ Integer numOfTuples = record.getNumOfTuples();
+ Integer numOfFeatures = ((Double[]) record.getField(1)).length;
double[][] x = new double[numOfTuples][numOfFeatures];
double[] y = new double[numOfTuples];
for (int i = 0; i < numOfTuples; i++) {
- // Tuple t = record.getTuple(i);
- Tuple t = inTuple;
+ Tuple t = record.getTuple(i);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
@@ -109,69 +121,90 @@ public class IncrementalOLS {
OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
ols.newSampleData(y, x);
- return new Tuple2(true, (Double[]) ArrayUtils.toObject(ols
- .estimateRegressionParameters()));
+ return new StreamRecord(new Tuple2(true,
+ (Double[]) ArrayUtils.toObject(ols.estimateRegressionParameters())));
}
}
- // TODO: How do I know the x for which I have predicted y?
- public static class Predictor extends MapFunction, Tuple1> {
+ public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
// StreamRecord batchModel = null;
Double[] partialModel = new Double[] { 0.0, 0.0 };
@Override
- public Tuple1 map(Tuple2 inTuple) throws Exception {
- if (isModel(inTuple)) {
- partialModel = inTuple.f1;
+ public void invoke(StreamRecord record) throws Exception {
+ if (isModel(record)) {
+ partialModel = (Double[]) record.getField(1);
// batchModel = getBatchModel();
- return null; //TODO: fix
} else {
- return predict(inTuple);
+ emit(predict(record));
}
}
- protected boolean isModel(Tuple2 inTuple) {
- return inTuple.f0;
+ // protected StreamRecord getBatchModel() {
+ // return new StreamRecord(new Tuple1(1));
+ // }
+
+ protected boolean isModel(StreamRecord record) {
+ return record.getBoolean(0);
}
- protected Tuple1 predict(Tuple2 inTuple) {
- Double[] x = inTuple.f1;
+ protected StreamRecord predict(StreamRecord record) {
+ Double[] x = (Double[]) record.getField(1);
Double prediction = 0.0;
for (int i = 0; i < x.length; i++) {
prediction = prediction + x[i] * partialModel[i];
}
- return new Tuple1(prediction);
+ return new StreamRecord(new Tuple1(prediction));
}
}
- public static class IncOLSSink extends SinkFunction> {
+ public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple1 inTuple) {
- System.out.println(inTuple);
+ public void invoke(StreamRecord record) throws Exception {
}
}
- public static void main(String[] args) {
+ private static JobGraph getJobGraph() {
+ JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
+ FaultToleranceType.NONE);
+
+ graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
+ graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
+ graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
+ graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
+ graphBuilder.setSink("Sink",new Sink(), 1, 1);
- StreamExecutionEnvironment env = new StreamExecutionEnvironment();
+ graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
+ graphBuilder.shuffleConnect("NewData", "Predictor");
+ graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
+ graphBuilder.shuffleConnect("Predictor", "Sink");
- DataStream> model =
- env.addSource(new TrainingDataSource())
- .map(new PartialModelBuilder())
- .broadcast();
+ return graphBuilder.getJobGraph();
+ }
+
+ public static void main(String[] args) {
+
+ // set logging parameters for local run
- DataStream> prediction =
- env.addSource(new NewDataSource())
- .connectWith(model)
- .map(new Predictor())
- .addSink(new IncOLSSink());
+ LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+
+ if (args.length == 0) {
+ args = new String[] { "local" };
+ }
+
+ if (args[0].equals("local")) {
+ ClusterUtil.runOnMiniCluster(getJobGraph());
+
+ } else if (args[0].equals("cluster")) {
+ ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
+ }
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java
index 80275268df30186d0113c061bae207b7429870a6..a18f2350aa06e146cbb143bed1aa1fd903e531fa 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java
@@ -15,72 +15,32 @@
package eu.stratosphere.streaming.examples.window.join;
-import java.net.InetSocketAddress;
-
import org.apache.log4j.Level;
-import eu.stratosphere.client.minicluster.NepheleMiniCluster;
-import eu.stratosphere.client.program.Client;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.streaming.api.JobGraphBuilder;
-import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.streaming.api.DataStream;
+import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
+import eu.stratosphere.streaming.examples.join.JoinSink;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowJoinLocal {
- public static JobGraph getJobGraph() {
- JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
- graphBuilder.setSource("WindowJoinSourceOne", WindowJoinSourceOne.class);
- graphBuilder.setSource("WindowJoinSourceTwo", WindowJoinSourceTwo.class);
- graphBuilder.setTask("WindowJoinTask", WindowJoinTask.class, 1, 1);
- graphBuilder.setSink("WindowJoinSink", WindowJoinSink.class);
-
- graphBuilder.fieldsConnect("WindowJoinSourceOne", "WindowJoinTask", 1);
- graphBuilder.fieldsConnect("WindowJoinSourceTwo", "WindowJoinTask", 1);
- graphBuilder.shuffleConnect("WindowJoinTask", "WindowJoinSink");
-
- return graphBuilder.getJobGraph();
- }
-
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
- try {
-
- JobGraph jG = getJobGraph();
- Configuration configuration = jG.getJobConfiguration();
-
- if (args.length == 0) {
- args = new String[] { "local" };
- }
-
- if (args[0].equals("local")) {
- System.out.println("Running in Local mode");
- NepheleMiniCluster exec = new NepheleMiniCluster();
-
- exec.start();
-
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
-
- client.run(jG, true);
-
- exec.stop();
-
- } else if (args[0].equals("cluster")) {
- System.out.println("Running in Cluster2 mode");
-
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
- configuration);
+ StreamExecutionEnvironment context = new StreamExecutionEnvironment();
- client.run(jG, true);
+ DataStream> source1 = context
+ .addSource(new WindowJoinSourceOne());
- }
+ @SuppressWarnings("unused")
+ DataStream> source2 = context
+ .addSource(new WindowJoinSourceTwo()).connectWith(source1).partitionBy(1)
+ .flatMap(new WindowJoinTask()).addSink(new JoinSink());
- } catch (Exception e) {
- System.out.println(e);
- }
+ context.execute();
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java
index 9cf60202b8e7477be1f60535185f8c56602f87cb..1ae8ac969b0f77e14f9e2ba15cf29208a13ed2f3 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java
@@ -18,30 +18,28 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
-import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.api.SourceFunction;
+import eu.stratosphere.util.Collector;
-public class WindowJoinSourceOne extends UserSourceInvokable {
+public class WindowJoinSourceOne extends SourceFunction> {
private static final long serialVersionUID = 6670933703432267728L;
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
- "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
- "black", "peter" };
+ private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
- private StreamRecord outRecord = new StreamRecord(
- new Tuple4());
- private long progress = 0L;
+ private Tuple4 outRecord = new Tuple4();
+ private Long progress = 0L;
@Override
- public void invoke() throws Exception {
+ public void invoke(Collector> collector) throws Exception {
while (true) {
- outRecord.setString(0, "salary");
- outRecord.setString(1, names[rand.nextInt(names.length)]);
- outRecord.setInteger(2, rand.nextInt(10000));
- outRecord.setLong(3, progress);
- emit(outRecord);
- progress+=1;
+ outRecord.f0 = "salary";
+ outRecord.f1 = names[rand.nextInt(names.length)];
+ outRecord.f2 = rand.nextInt(10000);
+ outRecord.f3 = progress;
+ collector.collect(outRecord);
+ progress += 1;
}
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java
index 6116e801b89dd97923da09d2eefffa56a504070c..0a4070d805aa8ccdb1aa27acee23bc3ebd5f8157 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java
@@ -18,30 +18,28 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
-import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.api.SourceFunction;
+import eu.stratosphere.util.Collector;
-public class WindowJoinSourceTwo extends UserSourceInvokable {
+public class WindowJoinSourceTwo extends SourceFunction> {
private static final long serialVersionUID = -5897483980082089771L;
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
- "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
- "black", "peter" };
+ private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
- private StreamRecord outRecord = new StreamRecord(
- new Tuple4());
- private long progress = 0L;
+ private Tuple4 outRecord = new Tuple4();
+ private Long progress = 0L;
@Override
- public void invoke() throws Exception {
+ public void invoke(Collector> collector) throws Exception {
while (true) {
- outRecord.setString(0, "grade");
- outRecord.setString(1, names[rand.nextInt(names.length)]);
- outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
- outRecord.setLong(3, progress);
- emit(outRecord);
- progress+=1;
+ outRecord.f0 = "grade";
+ outRecord.f1 = names[rand.nextInt(names.length)];
+ outRecord.f2 = rand.nextInt(5) + 1;
+ outRecord.f3 = progress;
+ collector.collect(outRecord);
+ progress += 1;
}
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java
index a88148a7f57d27bef5f72ebe9651c12297b8e881..5f515680f034a30083a9a8532045eba41cf5d664 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java
@@ -19,37 +19,38 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple3;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.util.Collector;
-public class WindowJoinTask extends UserTaskInvokable {
+public class WindowJoinTask extends
+ FlatMapFunction, Tuple3> {
class SalaryProgress {
- public SalaryProgress(int salary, long progress) {
+ public SalaryProgress(Integer salary, Long progress) {
this.salary = salary;
this.progress = progress;
}
- int salary;
- long progress;
+ Integer salary;
+ Long progress;
}
class GradeProgress {
- public GradeProgress(String grade, long progress) {
+ public GradeProgress(Integer grade, Long progress) {
this.grade = grade;
this.progress = progress;
}
- String grade;
- long progress;
+ Integer grade;
+ Long progress;
}
private static final long serialVersionUID = 749913336259789039L;
private int windowSize = 100;
private HashMap> gradeHashmap;
private HashMap> salaryHashmap;
- private StreamRecord outRecord = new StreamRecord(3);
public WindowJoinTask() {
gradeHashmap = new HashMap>();
@@ -57,59 +58,49 @@ public class WindowJoinTask extends UserTaskInvokable {
}
@Override
- public void invoke(StreamRecord record) throws Exception {
- // TODO Auto-generated method stub
- String streamId = record.getString(0);
- String name = record.getString(1);
- long progress = record.getLong(3);
+ public void flatMap(Tuple4 value,
+ Collector> out) throws Exception {
+ String streamId = value.f0;
+ String name = value.f1;
+ Long progress = value.f3;
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
- Iterator iterator = salaryHashmap.get(name)
- .iterator();
+ Iterator iterator = salaryHashmap.get(name).iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
- Tuple3 outputTuple = new Tuple3(
- name, record.getString(2), entry.salary);
- outRecord.addTuple(outputTuple);
+ Tuple3 outputTuple = new Tuple3(
+ name, value.f2, entry.salary);
+ out.collect(outputTuple);
}
}
- if (outRecord.getNumOfTuples() != 0) {
- emit(outRecord);
+ if (!gradeHashmap.containsKey(name)) {
+ gradeHashmap.put(name, new LinkedList());
}
- outRecord.Clear();
- }
- if (!gradeHashmap.containsKey(name)) {
- gradeHashmap.put(name, new LinkedList());
- }
- gradeHashmap.get(name).add(
- new GradeProgress(record.getString(2), progress));
- } else {
- if (gradeHashmap.containsKey(name)) {
- Iterator iterator = gradeHashmap.get(name)
- .iterator();
- while (iterator.hasNext()) {
- GradeProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- Tuple3 outputTuple = new Tuple3(
- name, entry.grade, record.getInteger(2));
- outRecord.addTuple(outputTuple);
+ gradeHashmap.get(name).add(new GradeProgress(value.f2, progress));
+ } else {
+ if (gradeHashmap.containsKey(name)) {
+ Iterator iterator = gradeHashmap.get(name).iterator();
+ while (iterator.hasNext()) {
+ GradeProgress entry = iterator.next();
+ if (progress - entry.progress > windowSize) {
+ iterator.remove();
+ } else {
+ Tuple3 outputTuple = new Tuple3(
+ name, entry.grade, value.f2);
+ out.collect(outputTuple);
+
+ }
}
}
- if (outRecord.getNumOfTuples() != 0) {
- emit(outRecord);
+ if (!salaryHashmap.containsKey(name)) {
+ salaryHashmap.put(name, new LinkedList());
}
- outRecord.Clear();
+ salaryHashmap.get(name).add(new SalaryProgress(value.f2, progress));
}
- if (!salaryHashmap.containsKey(name)) {
- salaryHashmap.put(name, new LinkedList());
- }
- salaryHashmap.get(name).add(
- new SalaryProgress(record.getInteger(2), progress));
+
}
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
index 5a7b55bfe92b9e4ffd71f6ef4af22aa443f7df26..e4a9cc1f73516d658b682bf49803ba19470f26f9 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
@@ -15,90 +15,87 @@
package eu.stratosphere.streaming.examples.window.sum;
+import java.util.ArrayList;
+
+import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
+import eu.stratosphere.util.Collector;
-public class WindowSumAggregate extends UserTaskInvokable {
+public class WindowSumAggregate extends
+ FlatMapFunction, Tuple2> {
private static final long serialVersionUID = 1L;
-
+
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
- private int windowFieldId = 1;
- private StreamRecord tempRecord;
- private SlidingWindowState window;
+ private ArrayList> tempTupleArray = null;
+ private Tuple2 outTuple = new Tuple2();
+ private SlidingWindowState window;
private MutableTableState sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
- private StreamRecord outRecord = new StreamRecord(
- new Tuple2());
-
public WindowSumAggregate() {
- window = new SlidingWindowState(windowSize, slidingStep,
+ window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState();
sum.put("sum", 0);
}
- private void incrementCompute(StreamRecord record) {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- int number = record.getInteger(i, 0);
+ private void incrementCompute(ArrayList> tupleArray) {
+ for (int i = 0; i < tupleArray.size(); ++i) {
+ int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") + number);
}
}
- private void decrementCompute(StreamRecord record) {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- int number = record.getInteger(i, 0);
+ private void decrementCompute(ArrayList> tupleArray) {
+ for (int i = 0; i < tupleArray.size(); ++i) {
+ int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") - number);
}
}
-
- private void produceRecord(long progress){
- outRecord.setInteger(0, sum.get("sum"));
- outRecord.setLong(1, progress);
- emit(outRecord);
- }
+ private void produceOutput(long progress, Collector> out){
+ outTuple.f0 = sum.get("sum");
+ outTuple.f1 = progress;
+ out.collect(outTuple);
+ }
+
@Override
- public void invoke(StreamRecord record) throws Exception {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- long progress = record.getLong(i, windowFieldId);
- if (initTimestamp == -1) {
- initTimestamp = progress;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new StreamRecord(record.getNumOfFields());
- } else {
- if (progress >= nextTimestamp) {
+ public void flatMap(Tuple2 value,
+ Collector> out) throws Exception {
+ // TODO Auto-generated method stub
+ long progress = value.f1;
+ if (initTimestamp == -1) {
+ initTimestamp = progress;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempTupleArray = new ArrayList>();
+ } else {
+ if (progress >= nextTimestamp) {
+ if (window.isFull()) {
+ ArrayList> expiredTupleArray = window.popFront();
+ incrementCompute(tempTupleArray);
+ decrementCompute(expiredTupleArray);
+ window.pushBack(tempTupleArray);
+ if (window.isEmittable()) {
+ produceOutput(progress, out);
+ }
+ } else {
+ incrementCompute(tempTupleArray);
+ window.pushBack(tempTupleArray);
if (window.isFull()) {
- StreamRecord expiredRecord = window.popFront();
- incrementCompute(tempRecord);
- decrementCompute(expiredRecord);
- window.pushBack(tempRecord);
- if (window.isEmittable()) {
- produceRecord(progress);
- }
- } else {
- incrementCompute(tempRecord);
- window.pushBack(tempRecord);
- if (window.isFull()) {
- produceRecord(progress);
- }
+ produceOutput(progress, out);
}
- initTimestamp = nextTimestamp;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new StreamRecord(record.getNumOfFields());
}
+ initTimestamp = nextTimestamp;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempTupleArray = new ArrayList>();
}
- tempRecord.addTuple(record.getTuple(i));
}
+ tempTupleArray.add(value);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
index 020ae031aa619e2f343843a579a832b7ad60165e..0f46d6dadcad0fb2ca2e001bbc3de9121d3af136 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
@@ -15,71 +15,21 @@
package eu.stratosphere.streaming.examples.window.sum;
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Level;
-
-import eu.stratosphere.client.minicluster.NepheleMiniCluster;
-import eu.stratosphere.client.program.Client;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.streaming.api.JobGraphBuilder;
-import eu.stratosphere.streaming.util.LogUtils;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.streaming.api.DataStream;
+import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowSumLocal {
-
- public static JobGraph getJobGraph() {
- JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
- graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
- graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
- graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
- graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
-
- graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
- graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
- graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
-
- return graphBuilder.getJobGraph();
- }
-
+
public static void main(String[] args) {
-
- LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
-
- try {
-
- JobGraph jG = getJobGraph();
- Configuration configuration = jG.getJobConfiguration();
-
- if (args.length == 0) {
- args = new String[] { "local" };
- }
-
- if (args[0].equals("local")) {
- System.out.println("Running in Local mode");
- NepheleMiniCluster exec = new NepheleMiniCluster();
-
- exec.start();
-
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
-
- client.run(jG, true);
-
- exec.stop();
-
- } else if (args[0].equals("cluster")) {
- System.out.println("Running in Cluster2 mode");
-
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
- configuration);
-
- client.run(jG, true);
-
- }
-
- } catch (Exception e) {
- System.out.println(e);
- }
-
+ StreamExecutionEnvironment context = new StreamExecutionEnvironment();
+ @SuppressWarnings("unused")
+ DataStream> dataStream = context
+ .addSource(new WindowSumSource())
+ .map(new WindowSumMultiple())
+ .flatMap(new WindowSumAggregate())
+ .addSink(new WindowSumSink());
+
+ context.execute();
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
index 4e366a2c38259f174a6e6b8f05240b27f2daefb0..e0e089de20198296158419fc1d2aaa2b04d98319 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
@@ -15,21 +15,19 @@
package eu.stratosphere.streaming.examples.window.sum;
+import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
-public class WindowSumMultiple extends UserTaskInvokable {
+public class WindowSumMultiple extends MapFunction, Tuple2> {
private static final long serialVersionUID = 1L;
- private StreamRecord outputRecord = new StreamRecord(new Tuple2());
-
+ private Tuple2 outTuple = new Tuple2();
+
@Override
- public void invoke(StreamRecord record) throws Exception {
- Integer number = record.getInteger(0);
- Long timestamp = record.getLong(1);
- outputRecord.setInteger(0, number+1);
- outputRecord.setLong(1, timestamp);
- emit(outputRecord);
+ public Tuple2 map(Tuple2 inTuple) throws Exception {
+ // TODO Auto-generated method stub
+ outTuple.f0 = inTuple.f0 * 2;
+ outTuple.f1 = inTuple.f1;
+ return outTuple;
}
}
\ No newline at end of file
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
index fd8034fe92cefbd40886c688439c7c4507d287ef..a96a7bec5b45433612bb61cc29ef808c0173c060 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
@@ -15,21 +15,15 @@
package eu.stratosphere.streaming.examples.window.sum;
-import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.streaming.api.SinkFunction;
-public class WindowSumSink extends UserSinkInvokable {
+public class WindowSumSink extends SinkFunction> {
private static final long serialVersionUID = 1L;
- private Integer sum = 0;
- private long timestamp = 0;
-
@Override
- public void invoke(StreamRecord record) throws Exception {
- sum = record.getInteger(0);
- timestamp = record.getLong(1);
- System.out.println("============================================");
- System.out.println(sum + " " + timestamp);
- System.out.println("============================================");
+ public void invoke(Tuple2 inTuple) {
+ // TODO Auto-generated method stub
+ System.out.println(inTuple);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
index 7dfe44a2d6d5e09f3c07fd56d9c100dd2c985314..c3b8999942f4c019f82a3d3a285d7fa1582cedc3 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
@@ -16,23 +16,23 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.api.SourceFunction;
+import eu.stratosphere.util.Collector;
-public class WindowSumSource extends UserSourceInvokable {
+public class WindowSumSource extends SourceFunction> {
private static final long serialVersionUID = 1L;
- private StreamRecord outRecord = new StreamRecord(
- new Tuple2());
+ private Tuple2 outRecord = new Tuple2();
private Long timestamp = 0L;
@Override
- public void invoke() throws Exception {
+ public void invoke(Collector> collector) throws Exception {
+ // TODO Auto-generated method stub
for (int i = 0; i < 1000; ++i) {
- outRecord.setInteger(0, i);
- outRecord.setLong(1, timestamp);
+ outRecord.f0 = i;
+ outRecord.f1 = timestamp;
+ collector.collect(outRecord);
timestamp++;
- emit(outRecord);
- }
+ }
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
index a2b8768553e5d11091735cdb4fb495f670cf4a64..0f559e96de3f40f2bb997a5e22dbf314ad3de43d 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
@@ -15,40 +15,41 @@
package eu.stratosphere.streaming.examples.window.wordcount;
+import java.util.ArrayList;
+
+import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
+import eu.stratosphere.util.Collector;
-public class WindowWordCountCounter extends UserTaskInvokable {
+public class WindowWordCountCounter extends
+ FlatMapFunction, Tuple3> {
private static final long serialVersionUID = 1L;
-
- private int windowSize=10;
- private int slidingStep=2;
- private int computeGranularity=1;
- private int windowFieldId=2;
- private StreamRecord tempRecord;
- private SlidingWindowState window;
+ private int windowSize = 10;
+ private int slidingStep = 2;
+ private int computeGranularity = 1;
+
+ private ArrayList> tempTupleArray = null;
+ private Tuple3 outTuple = new Tuple3();
+ private SlidingWindowState window;
private MutableTableState wordCounts;
- private long initTimestamp=-1;
- private long nextTimestamp=-1;
+ private long initTimestamp = -1;
+ private long nextTimestamp = -1;
private Long timestamp = 0L;
- private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
- window = new SlidingWindowState(windowSize, slidingStep,
+ window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState();
}
- private void incrementCompute(StreamRecord record) {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- String word = record.getString(i, 0);
+ private void incrementCompute(ArrayList> tupleArray) {
+ for (int i = 0; i < tupleArray.size(); ++i) {
+ String word = tupleArray.get(i).f0;
if (wordCounts.containsKey(word)) {
int count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
@@ -58,10 +59,9 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
- private void decrementCompute(StreamRecord record) {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- String word = record.getString(i, 0);
+ private void decrementCompute(ArrayList> tupleArray) {
+ for (int i = 0; i < tupleArray.size(); ++i) {
+ String word = tupleArray.get(i).f0;
int count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
@@ -71,51 +71,48 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
- private void produceRecord(long progress){
- outRecord.Clear();
- MutableTableStateIterator iterator = wordCounts
- .getIterator();
+ private void produceOutput(long progress, Collector> out) {
+ MutableTableStateIterator iterator = wordCounts.getIterator();
while (iterator.hasNext()) {
Tuple2 tuple = iterator.next();
- Tuple3 outputTuple = new Tuple3(
- (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
- outRecord.addTuple(outputTuple);
+ outTuple.f0 = tuple.f0;
+ outTuple.f1 = tuple.f1;
+ outTuple.f2 = timestamp;
+ out.collect(outTuple);
}
- emit(outRecord);
}
-
+
@Override
- public void invoke(StreamRecord record) throws Exception {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- long progress = record.getLong(i, windowFieldId);
- if (initTimestamp == -1) {
- initTimestamp = progress;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new StreamRecord(record.getNumOfFields());
- } else {
- if (progress >= nextTimestamp) {
+ public void flatMap(Tuple2 value,
+ Collector> out) throws Exception {
+ // TODO Auto-generated method stub
+ timestamp = value.f1;
+ if (initTimestamp == -1) {
+ initTimestamp = timestamp;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempTupleArray = new ArrayList>();
+ } else {
+ if (timestamp >= nextTimestamp) {
+ if (window.isFull()) {
+ ArrayList> expiredTupleArray = window.popFront();
+ incrementCompute(tempTupleArray);
+ decrementCompute(expiredTupleArray);
+ window.pushBack(tempTupleArray);
+ if (window.isEmittable()) {
+ produceOutput(timestamp, out);
+ }
+ } else {
+ incrementCompute(tempTupleArray);
+ window.pushBack(tempTupleArray);
if (window.isFull()) {
- StreamRecord expiredRecord = window.popFront();
- incrementCompute(tempRecord);
- decrementCompute(expiredRecord);
- window.pushBack(tempRecord);
- if (window.isEmittable()) {
- produceRecord(progress);
- }
- } else {
- incrementCompute(tempRecord);
- window.pushBack(tempRecord);
- if (window.isFull()) {
- produceRecord(progress);
- }
+ produceOutput(timestamp, out);
}
- initTimestamp = nextTimestamp;
- nextTimestamp = initTimestamp + computeGranularity;
- tempRecord = new StreamRecord(record.getNumOfFields());
}
+ initTimestamp = nextTimestamp;
+ nextTimestamp = initTimestamp + computeGranularity;
+ tempTupleArray = new ArrayList>();
}
- tempRecord.addTuple(record.getTuple(i));
}
+ tempTupleArray.add(value);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java
index c240c609494a2b3a63e0364d05627820e0e39cfc..d8ae6a5a5c8a42642f755563f3c4313c73067f74 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java
@@ -15,72 +15,22 @@
package eu.stratosphere.streaming.examples.window.wordcount;
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Level;
-
-import eu.stratosphere.client.minicluster.NepheleMiniCluster;
-import eu.stratosphere.client.program.Client;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.streaming.api.JobGraphBuilder;
-import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
-import eu.stratosphere.streaming.util.LogUtils;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.streaming.api.DataStream;
+import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowWordCountLocal {
- public static JobGraph getJobGraph() {
- JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
- graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
- graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
- graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
- graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
-
- graphBuilder.shuffleConnect("WindowWordCountSource", "WindowWordCountSplitter");
- graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0);
- graphBuilder.shuffleConnect("WindowWordCountCounter", "WindowWordCountSink");
-
- return graphBuilder.getJobGraph();
- }
-
public static void main(String[] args) {
-
- LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
-
- try {
-
- JobGraph jG = getJobGraph();
- Configuration configuration = jG.getJobConfiguration();
-
- if (args.length == 0) {
- args = new String[] { "local" };
- }
-
- if (args[0].equals("local")) {
- System.out.println("Running in Local mode");
- NepheleMiniCluster exec = new NepheleMiniCluster();
-
- exec.start();
-
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
-
- client.run(jG, true);
-
- exec.stop();
-
- } else if (args[0].equals("cluster")) {
- System.out.println("Running in Cluster2 mode");
-
- Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
- configuration);
-
- client.run(jG, true);
-
- }
-
- } catch (Exception e) {
- System.out.println(e);
- }
-
+ StreamExecutionEnvironment context = new StreamExecutionEnvironment();
+ @SuppressWarnings("unused")
+ DataStream> dataStream = context
+ .addSource(new WindowWordCountSource())
+ .flatMap(new WindowWordCountSplitter())
+ .partitionBy(0)
+ .flatMap(new WindowWordCountCounter())
+ .addSink(new WindowWordCountSink());
+
+ context.execute();
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java
index 0a97626fee93e421821f2b89667d14307060de77..e571c90dc15c9af2e933e8cf9b2ad3d0e4a2519b 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java
@@ -15,26 +15,15 @@
package eu.stratosphere.streaming.examples.window.wordcount;
-import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.streaming.api.SinkFunction;
-public class WindowWordCountSink extends UserSinkInvokable {
+public class WindowWordCountSink extends SinkFunction> {
private static final long serialVersionUID = 1L;
- private String word = "";
- private Integer count = 0;
- private Long timestamp = 0L;
-
@Override
- public void invoke(StreamRecord record) throws Exception {
- int numTuple = record.getNumOfTuples();
- for (int i = 0; i < numTuple; ++i) {
- word = record.getString(i, 0);
- count = record.getInteger(i, 1);
- timestamp = record.getLong(i, 2);
- System.out.println("============================================");
- System.out.println(word + " " + count + " " + timestamp);
- System.out.println("============================================");
- }
+ public void invoke(Tuple3 inTuple) {
+ // TODO Auto-generated method stubs
+ System.out.println(inTuple);
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java
index 0a244a4b2285efc2a35b1372ee10c09f054315a7..e2cbcfee173f1a9c47a717e66a5ebbce62e074ac 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java
@@ -16,32 +16,23 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.io.BufferedReader;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.streaming.api.SourceFunction;
+import eu.stratosphere.util.Collector;
-public class WindowWordCountSource extends UserSourceInvokable {
+public class WindowWordCountSource extends SourceFunction> {
private static final long serialVersionUID = 1L;
- private BufferedReader br = null;
private String line = "";
- private StreamRecord outRecord = new StreamRecord(new Tuple2());
+ private Tuple2 outRecord = new Tuple2();
private Long timestamp = 0L;
- public WindowWordCountSource() {
- try {
- br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- }
- timestamp = 0L;
- }
-
@Override
- public void invoke() throws Exception {
+ public void invoke(Collector> collector) throws Exception {
+ // TODO Auto-generated method stub
+ BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
while(true){
line = br.readLine();
if(line==null){
@@ -49,11 +40,11 @@ public class WindowWordCountSource extends UserSourceInvokable {
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
- outRecord.setString(0, line);
- outRecord.setLong(1, timestamp);
+ outRecord.f0 = line;
+ outRecord.f1 = timestamp;
+ collector.collect(outRecord);
timestamp++;
- emit(outRecord);
}
- }
+ }
}
}
diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
index 17f6dbe1c3377118d7cc4a05f29d6ef5033dd052..538cb783882d0392ee9e0f4e6754505ae2bf0d69 100644
--- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
+++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
@@ -15,26 +15,26 @@
package eu.stratosphere.streaming.examples.window.wordcount;
-import eu.stratosphere.api.java.tuple.Tuple3;
-import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
-import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
-public class WindowWordCountSplitter extends UserTaskInvokable {
+public class WindowWordCountSplitter extends FlatMapFunction, Tuple2> {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Long timestamp = 0L;
- private StreamRecord outputRecord = new StreamRecord(3);
+ private Tuple2 outTuple = new Tuple2();
@Override
- public void invoke(StreamRecord record) throws Exception {
- outputRecord.Clear();
- words = record.getString(0).split(" ");
- timestamp = record.getLong(1);
- for (String word : words) {
- Tuple3 tuple =new Tuple3(word, 1, timestamp);
- outputRecord.addTuple(tuple);
+ public void flatMap(Tuple2 inTuple, Collector