提交 2e3606c0 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] adapt to new APIs and modify iterative algorithms

上级 28ba85b5
......@@ -87,7 +87,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN tuple);
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN tuple);
}
......@@ -93,7 +93,6 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
@Override
public void close() {
// TODO Auto-generated method stub
}
......
......@@ -25,7 +25,6 @@ public class DefaultTaskInvokable extends UserTaskInvokable<Tuple, Tuple> {
@Override
public void invoke(StreamRecord record, StreamCollector<Tuple> collector) throws Exception {
// TODO Auto-generated method stub
}
......
......@@ -15,21 +15,22 @@
package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class StreamWindowTask extends UserTaskInvokable {
public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
private int computeGranularity;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private SlidingWindowState<Integer> window;
private ArrayList tempArrayList;
private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
......@@ -38,49 +39,47 @@ public class StreamWindowTask extends UserTaskInvokable {
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record){}
private void decrementCompute(StreamRecord record){}
private void produceRecord(long progress){}
private void incrementCompute(ArrayList tupleArray) {}
private void decrementCompute(ArrayList tupleArray) {}
private void produceOutput(long progress, Collector out) {}
@Override
public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
int numTuple = record.getBatchSize();
int tupleIndex = 0;
for (int i = 0; i < numTuple; ++i) {
long progress = record.getTuple(i).getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new ArrayStreamRecord(record.getBatchSize());
} else {
if (progress > nextTimestamp) {
public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
long progress = value.getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
} else {
if (progress > nextTimestamp) {
if (window.isFull()) {
ArrayList expiredArrayList = window.popFront();
incrementCompute(tempArrayList);
decrementCompute(expiredArrayList);
window.pushBack(tempArrayList);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempArrayList);
window.pushBack(tempArrayList);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(progress, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new ArrayStreamRecord(record.getBatchSize());
}
tempRecord.setTuple(tupleIndex++, record.getTuple(i));
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
}
}
tempArrayList.add(value);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class CollaborativeFilteringLocal {
public static void main(String[] args) {
}
}
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
public class CollaborativeFilteringMap {
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.util.HashMap;
import org.jblas.DoubleMatrix;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringReduce extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
HashMap<Integer, Integer> rowIndex=new HashMap<Integer, Integer>();
HashMap<Integer, Integer> columnIndex=new HashMap<Integer, Integer>();
DoubleMatrix userItem=new DoubleMatrix(1000, 2000);
DoubleMatrix coOccurence=new DoubleMatrix(2000, 2000);
@Override
public void invoke(StreamRecord record) throws Exception {
int userId = record.getInteger(0, 0);
int itemId = record.getInteger(0, 1);
int rating = record.getInteger(0, 2);
if(!rowIndex.containsKey(userId)){
rowIndex.put(userId, rowIndex.size());
}
if(!columnIndex.containsKey(itemId)){
columnIndex.put(itemId, columnIndex.size());
}
userItem.put(rowIndex.get(userId), columnIndex.get(itemId), rating);
//outRecord.setString(0, line);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SinkFunction;
public class CollaborativeFilteringSink extends SinkFunction<Tuple4<Integer, Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple4<Integer, Integer, Integer, Long> inTuple) {
System.out.println(inTuple);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class CollaborativeFilteringSource extends SourceFunction<Tuple4<Integer, Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
private String line = "";
private Tuple4<Integer, Integer, Integer, Long> outRecord = new Tuple4<Integer, Integer, Integer, Long>();
private Long timestamp = 0L;
@Override
public void invoke(
Collector<Tuple4<Integer, Integer, Integer, Long>> collector)
throws Exception {
BufferedReader br = new BufferedReader(new FileReader(
"src/test/resources/testdata/MovieLens100k.data"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] items=line.split("\t");
outRecord.f0 = Integer.valueOf(items[0]);
outRecord.f1 = Integer.valueOf(items[1]);
outRecord.f2 = Integer.valueOf(items[2]);
outRecord.f3 = timestamp;
collector.collect(outRecord);
timestamp++;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class KMeansLocal {
public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = context
.addSource(new KMeansSource(2, 2, 1, 5))
.addFixPoint(new KMeansMap(), new KMeansReduce(), 20)
.addSink(new KMeansSink());
context.execute();
}
}
package eu.stratosphere.streaming.examples.iterative.kmeans;
public class KMeansMap {
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class KMeansReduce extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private double[] point=null;
public KMeansReduce(int dimension){
point = new double[dimension];
}
@Override
public void invoke(StreamRecord record) throws Exception {
String[] pointStr = record.getString(0, 0).split(" ");
for(int i=0; i<pointStr.length; ++i){
point[i]=Double.valueOf(pointStr[i]);
}
outRecord.setString(0, record.getString(0, 0));
emit(outRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class KMeansSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple3<Integer, Integer, Long> tuple) {
System.out.println(tuple);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class KMeansSource extends SourceFunction<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_SEED = 4650285087650871364L;
private Random random = new Random(DEFAULT_SEED);
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private int numCenter;
private int dimension;
private double absoluteStdDev;
private double range;
private StringBuilder buffer = new StringBuilder();
public KMeansSource(int numCenter, int dimension, double stddev, double range){
this.numCenter=numCenter;
this.dimension=dimension;
this.absoluteStdDev = stddev * range;
this.range=range;
}
@Override
public void invoke(Collector<Tuple2<String, Long>> collector)
throws Exception {
double[][] means = uniformRandomCenters(random, numCenter, dimension, range);
double[] point = new double[dimension];
int nextCentroid = 0;
while (true) {
// generate a point for the current centroid
double[] centroid = means[nextCentroid];
for (int d = 0; d < dimension; d++) {
point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
}
nextCentroid = (nextCentroid + 1) % numCenter;
String pointString=generatePointString(point);
outRecord.f0 = pointString;
collector.collect(outRecord);
}
}
private double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];
for (int i = 0; i < num; i++) {
for (int dim = 0; dim < dimensionality; dim ++) {
points[i][dim] = (rnd.nextDouble() * range) - halfRange;
}
}
return points;
}
private String generatePointString(double[] point){
buffer.setLength(0);
for (int j = 0; j < dimension; j++) {
buffer.append(point[j]);
if(j < dimension - 1) {
buffer.append(" ");
}
}
return buffer.toString();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class PageRankLocal {
public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = context
.addSource(new PageRankSource())
.map(new Counter())
.addSink(new PageRankSink());
context.execute();
}
}
package eu.stratosphere.streaming.examples.iterative.pagerank;
public class PageRankMap {
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class PageRankReduce extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private Graph linkGraph = new Graph();
@Override
public void invoke(StreamRecord record) throws Exception {
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class PageRankSink extends SinkFunction<Tuple3<Integer, Float, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple3<Integer, Float, Long> tuple) {
System.out.println(tuple);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class PageRankSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
private Tuple3<Integer, Integer, Long> outRecord = new Tuple3<Integer, Integer, Long>();
private Long timestamp = 0L;
@Override
public void invoke(Collector<Tuple3<Integer, Integer, Long>> collector)
throws Exception {
BufferedReader br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.f0 = Integer.valueOf(link[0]);
outRecord.f1 = Integer.valueOf(link[1]);
outRecord.f2 = timestamp;
collector.collect(outRecord);
timestamp += 1;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class SSSPLocal {
public static void main(String[] args) {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = context
.addSource(new SSSPSource())
.flatMap(new SSSPCounter())
.addSink(new SSSPSink());
context.execute();
}
}
......@@ -13,80 +13,27 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
package eu.stratosphere.streaming.examples.iterative.sssp;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.GraphState;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The log-structured key value store thats accept any modification operation by
* appending the value to the end of the state.
*/
public class LogTableState<K, V> implements TableState<K, V> {
private HashMap<K, IndexPair> hashMap = new HashMap<K, IndexPair>();
private HashMap<Integer, ArrayList<V>> blockList = new HashMap<Integer, ArrayList<V>>();
private final int perBlockEntryCount = 1000;
private IndexPair nextInsertPos = new IndexPair(-1, -1);
public LogTableState() {
blockList.put(0, new ArrayList<V>());
nextInsertPos.setIndexPair(0, 0);
}
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
if (nextInsertPos.entryId == perBlockEntryCount) {
blockList.put(nextInsertPos.blockId + 1, new ArrayList<V>());
nextInsertPos.IncrementBlock();
}
blockList.get(nextInsertPos.blockId).add(value);
hashMap.put(key, new IndexPair(nextInsertPos));
nextInsertPos.entryId += 1;
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
IndexPair index = hashMap.get(key);
if (index == null) {
return null;
} else {
return blockList.get(index.blockId).get(index.entryId);
}
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
hashMap.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return hashMap.containsKey(key);
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
public class SSSPMap extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private GraphState linkGraph = new GraphState();
@Override
public TableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new LogTableStateIterator<K, V>(hashMap.entrySet().iterator(), blockList);
public void invoke(StreamRecord record) throws Exception {
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.GraphState;
public class SSSPReduce extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private GraphState linkGraph = new GraphState();
@Override
public void invoke(StreamRecord record) throws Exception {
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class SSSPSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple3<Integer, Integer, Long> tuple) {
System.out.println(tuple);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class SSSPSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
private Tuple3<Integer, Integer, Long> outRecord = new Tuple3<Integer, Integer, Long>();
private Long timestamp = 0L;
@Override
public void invoke(Collector<Tuple3<Integer, Integer, Long>> collector)
throws Exception {
BufferedReader br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.f0 = Integer.valueOf(link[0]);
outRecord.f1 = Integer.valueOf(link[1]);
outRecord.f2 = timestamp;
collector.collect(outRecord);
timestamp += 1;
}
}
}
}
......@@ -14,134 +14,188 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
// Source for feeding new data for prediction
public static class NewDataSource extends SourceFunction<Tuple1<Integer>> {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>(1));
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
public void invoke() throws Exception {
while (true) {
collector.collect(getNewData());
record.setTuple(getNewData());
emit(record);
}
}
// Method for pulling new data for prediction
private Tuple1<Integer> getNewData() throws InterruptedException {
private Tuple getNewData() throws InterruptedException {
return new Tuple1<Integer>(1);
}
}
// Source for feeding new training data for partial model building
public static class TrainingDataSource extends SourceFunction<Tuple1<Integer>> {
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// Number of tuples grouped for building partial model
// TODO: batch training data
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(1, BATCH_SIZE);
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
// Group the predefined number of records in a streamrecord then
// emit for model building
collector.collect(getTrainingData());;
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
emit(record);
}
}
// Method for pulling new training data
private Tuple1<Integer> getTrainingData() throws InterruptedException {
private Tuple getTrainingData() throws InterruptedException {
return new Tuple1<Integer>(1);
}
}
// Task for building up-to-date partial models on new training data
public static class PartialModelBuilder extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<Integer> map(Tuple1<Integer> inTuple) throws Exception {
return buildPartialModel(inTuple);
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
}
// Method for building partial model on the grouped training data
protected Tuple1<Integer> buildPartialModel(Tuple1<Integer> inTuple) {
return new Tuple1<Integer>(1);
protected StreamRecord buildPartialModel(StreamRecord record) {
return new StreamRecord(new Tuple1<Integer>(1));
}
}
// Task for performing prediction using the model produced in
// batch-processing and the up-to-date partial model
public static class Predictor extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
Tuple1<Integer> batchModel = null;
Tuple1<Integer> partialModel = null;
StreamRecord batchModel = null;
StreamRecord partialModel = null;
@Override
public Tuple1<Integer> map(Tuple1<Integer> inTuple) throws Exception {
if (isModel(inTuple)) {
partialModel = inTuple;
public void invoke(StreamRecord record) throws Exception {
if (isModel(record)) {
partialModel = record;
batchModel = getBatchModel();
return null; //TODO: fix
} else {
return predict(inTuple);
emit(predict(record));
}
}
// Pulls model built with batch-job on the old training data
protected Tuple1<Integer> getBatchModel() {
return new Tuple1<Integer>(1);
protected StreamRecord getBatchModel() {
return new StreamRecord(new Tuple1<Integer>(1));
}
// Checks whether the record is a model or a new data
protected boolean isModel(Tuple1<Integer> inTuple) {
protected boolean isModel(StreamRecord record) {
return true;
}
// Performs prediction using the two models
protected Tuple1<Integer> predict(Tuple1<Integer> inTuple) {
return new Tuple1<Integer>(0);
protected StreamRecord predict(StreamRecord record) {
return new StreamRecord(new Tuple1<Integer>(0));
}
}
public static class IMLSink extends SinkFunction<Tuple1<Integer>> {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Integer> inTuple) {
public void invoke(StreamRecord record) throws Exception {
// do nothing
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1);
graphBuilder.setTask("Predictor", Predictor.class, 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor");
graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
graphBuilder.shuffleConnect("Predictor", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
DataStream<Tuple1<Integer>> model =
env.addSource(new TrainingDataSource())
.map(new PartialModelBuilder())
.broadcast();
DataStream<Tuple1<Integer>> prediction =
env.addSource(new NewDataSource())
.connectWith(model)
.map(new Predictor())
.addSink(new IMLSink());
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
\ No newline at end of file
......@@ -18,57 +18,73 @@ import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
public static class NewDataSource extends SourceFunction<Tuple2<Boolean, Double[]>> {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(2, 1);
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Boolean, Double[]>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
// pull new record from data source
collector.collect(getNewData());
record.setTuple(getNewData());
emit(record);
}
}
private Tuple2<Boolean, Double[]> getNewData() throws InterruptedException {
private Tuple getNewData() throws InterruptedException {
return new Tuple2<Boolean, Double[]>(false, new Double[] { rnd.nextDouble() * 3,
rnd.nextDouble() * 5 });
}
}
public static class TrainingDataSource extends SourceFunction<Tuple2<Double, Double[]>> {
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// TODO: batch training data
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Double, Double[]>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
collector.collect(getTrainingData());
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
emit(record);
}
}
private Tuple2<Double, Double[]> getTrainingData() throws InterruptedException {
private Tuple getTrainingData() throws InterruptedException {
return new Tuple2<Double, Double[]>(rnd.nextDouble() * 10, new Double[] {
rnd.nextDouble() * 3, rnd.nextDouble() * 5 });
......@@ -76,29 +92,25 @@ public class IncrementalOLS {
}
}
public static class PartialModelBuilder extends
MapFunction<Tuple2<Double, Double[]>, Tuple2<Boolean, Double[]>> {
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Boolean, Double[]> map(Tuple2<Double, Double[]> inTuple) throws Exception {
return buildPartialModel(inTuple);
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
}
// TODO: deal with batchsize
protected Tuple2<Boolean, Double[]> buildPartialModel(Tuple2<Double, Double[]> inTuple) {
protected StreamRecord buildPartialModel(StreamRecord record) {
// Integer numOfTuples = record.getNumOfTuples();
Integer numOfTuples = 1;
Integer numOfFeatures = ((Double[]) inTuple.getField(1)).length;
Integer numOfTuples = record.getNumOfTuples();
Integer numOfFeatures = ((Double[]) record.getField(1)).length;
double[][] x = new double[numOfTuples][numOfFeatures];
double[] y = new double[numOfTuples];
for (int i = 0; i < numOfTuples; i++) {
// Tuple t = record.getTuple(i);
Tuple t = inTuple;
Tuple t = record.getTuple(i);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
......@@ -109,69 +121,90 @@ public class IncrementalOLS {
OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
ols.newSampleData(y, x);
return new Tuple2<Boolean, Double[]>(true, (Double[]) ArrayUtils.toObject(ols
.estimateRegressionParameters()));
return new StreamRecord(new Tuple2<Boolean, Double[]>(true,
(Double[]) ArrayUtils.toObject(ols.estimateRegressionParameters())));
}
}
// TODO: How do I know the x for which I have predicted y?
public static class Predictor extends MapFunction<Tuple2<Boolean, Double[]>, Tuple1<Double>> {
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
// StreamRecord batchModel = null;
Double[] partialModel = new Double[] { 0.0, 0.0 };
@Override
public Tuple1<Double> map(Tuple2<Boolean, Double[]> inTuple) throws Exception {
if (isModel(inTuple)) {
partialModel = inTuple.f1;
public void invoke(StreamRecord record) throws Exception {
if (isModel(record)) {
partialModel = (Double[]) record.getField(1);
// batchModel = getBatchModel();
return null; //TODO: fix
} else {
return predict(inTuple);
emit(predict(record));
}
}
protected boolean isModel(Tuple2<Boolean, Double[]> inTuple) {
return inTuple.f0;
// protected StreamRecord getBatchModel() {
// return new StreamRecord(new Tuple1<Integer>(1));
// }
protected boolean isModel(StreamRecord record) {
return record.getBoolean(0);
}
protected Tuple1<Double> predict(Tuple2<Boolean, Double[]> inTuple) {
Double[] x = inTuple.f1;
protected StreamRecord predict(StreamRecord record) {
Double[] x = (Double[]) record.getField(1);
Double prediction = 0.0;
for (int i = 0; i < x.length; i++) {
prediction = prediction + x[i] * partialModel[i];
}
return new Tuple1<Double>(prediction);
return new StreamRecord(new Tuple1<Double>(prediction));
}
}
public static class IncOLSSink extends SinkFunction<Tuple1<Double>> {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> inTuple) {
System.out.println(inTuple);
public void invoke(StreamRecord record) throws Exception {
}
}
public static void main(String[] args) {
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
FaultToleranceType.NONE);
graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
graphBuilder.setSink("Sink",new Sink(), 1, 1);
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor");
graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
graphBuilder.shuffleConnect("Predictor", "Sink");
DataStream<Tuple2<Boolean, Double[]>> model =
env.addSource(new TrainingDataSource())
.map(new PartialModelBuilder())
.broadcast();
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
// set logging parameters for local run
DataStream<Tuple1<Double>> prediction =
env.addSource(new NewDataSource())
.connectWith(model)
.map(new Predictor())
.addSink(new IncOLSSink());
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
}
}
}
......@@ -15,90 +15,86 @@
package eu.stratosphere.streaming.examples.window.sum;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class WindowSumAggregate extends UserTaskInvokable {
public class WindowSumAggregate extends
FlatMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private SlidingWindowState<Integer> window;
private ArrayList<Tuple2<Integer, Long>> tempTupleArray = null;
private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
private void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") + number);
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
private void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") - number);
}
}
private void produceRecord(long progress){
outRecord.setInteger(0, sum.get("sum"));
outRecord.setLong(1, progress);
emit(outRecord);
}
private void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
outTuple.f0 = sum.get("sum");
outTuple.f1 = progress;
out.collect(outTuple);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
public void flatMap(Tuple2<Integer, Long> value,
Collector<Tuple2<Integer, Long>> out) throws Exception {
long progress = value.f1;
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<Integer, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(progress, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
}
tempRecord.addTuple(record.getTuple(i));
}
tempTupleArray.add(value);
}
}
......@@ -15,71 +15,21 @@
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowSumLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<Integer, Long>> dataStream = context
.addSource(new WindowSumSource())
.map(new WindowSumMultiple())
.flatMap(new WindowSumAggregate())
.addSink(new WindowSumSink());
context.execute();
}
}
......@@ -15,21 +15,18 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
public class WindowSumMultiple extends MapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Long>());
private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
@Override
public void invoke(StreamRecord record) throws Exception {
Integer number = record.getInteger(0);
Long timestamp = record.getLong(1);
outputRecord.setInteger(0, number+1);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
public Tuple2<Integer, Long> map(Tuple2<Integer, Long> inTuple) throws Exception {
outTuple.f0 = inTuple.f0 * 2;
outTuple.f1 = inTuple.f1;
return outTuple;
}
}
\ No newline at end of file
......@@ -15,21 +15,14 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
public class WindowSumSink extends UserSinkInvokable {
public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private Integer sum = 0;
private long timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
sum = record.getInteger(0);
timestamp = record.getLong(1);
System.out.println("============================================");
System.out.println(sum + " " + timestamp);
System.out.println("============================================");
public void invoke(Tuple2<Integer, Long> inTuple) {
System.out.println(inTuple);
}
}
......@@ -16,23 +16,22 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowSumSource extends UserSourceInvokable {
public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
private Tuple2<Integer, Long> outRecord = new Tuple2<Integer, Long>();
private Long timestamp = 0L;
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple2<Integer, Long>> collector) throws Exception {
for (int i = 0; i < 1000; ++i) {
outRecord.setInteger(0, i);
outRecord.setLong(1, timestamp);
outRecord.f0 = i;
outRecord.f1 = timestamp;
collector.collect(outRecord);
timestamp++;
emit(outRecord);
}
}
}
}
......@@ -15,40 +15,41 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class WindowWordCountCounter extends UserTaskInvokable {
public class WindowWordCountCounter extends
FlatMapFunction<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize=10;
private int slidingStep=2;
private int computeGranularity=1;
private int windowFieldId=2;
private StreamRecord tempRecord;
private SlidingWindowState<Integer> window;
private int windowSize = 10;
private int slidingStep = 2;
private int computeGranularity = 1;
private ArrayList<Tuple2<String, Long>> tempTupleArray = null;
private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp=-1;
private long nextTimestamp=-1;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
private void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
if (wordCounts.containsKey(word)) {
int count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -58,10 +59,9 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
private void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
int count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
......@@ -71,51 +71,47 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void produceRecord(long progress){
outRecord.Clear();
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
private void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
MutableTableStateIterator<String, Integer> iterator = wordCounts.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
outTuple.f0 = tuple.f0;
outTuple.f1 = tuple.f1;
outTuple.f2 = timestamp;
out.collect(outTuple);
}
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple3<String, Integer, Long>> out) throws Exception {
timestamp = value.f1;
if (initTimestamp == -1) {
initTimestamp = timestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
} else {
if (timestamp >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<String, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(timestamp, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(timestamp, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
}
tempRecord.addTuple(record.getTuple(i));
}
tempTupleArray.add(value);
}
}
......@@ -15,72 +15,22 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.shuffleConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0);
graphBuilder.shuffleConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Long>> dataStream = context
.addSource(new WindowWordCountSource())
.flatMap(new WindowWordCountSplitter())
.partitionBy(0)
.flatMap(new WindowWordCountCounter())
.addSink(new WindowWordCountSink());
context.execute();
}
}
......@@ -15,26 +15,14 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class WindowWordCountSink extends UserSinkInvokable {
public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
public void invoke(Tuple3<String, Integer, Long> inTuple) {
System.out.println(inTuple);
}
}
......@@ -16,32 +16,22 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowWordCountSource extends UserSourceInvokable {
public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private Long timestamp = 0L;
public WindowWordCountSource() {
try {
br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple2<String, Long>> collector) throws Exception {
BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
while(true){
line = br.readLine();
if(line==null){
......@@ -49,11 +39,11 @@ public class WindowWordCountSource extends UserSourceInvokable {
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
outRecord.f0 = line;
outRecord.f1 = timestamp;
collector.collect(outRecord);
timestamp++;
emit(outRecord);
}
}
}
}
}
......@@ -15,26 +15,26 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.util.Collector;
import eu.stratosphere.api.java.functions.FlatMapFunction;
public class WindowWordCountSplitter extends UserTaskInvokable {
public class WindowWordCountSplitter extends FlatMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Long timestamp = 0L;
private StreamRecord outputRecord = new StreamRecord(3);
private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
@Override
public void invoke(StreamRecord record) throws Exception {
outputRecord.Clear();
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
public void flatMap(Tuple2<String, Long> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
words=inTuple.f0.split(" ");
timestamp=inTuple.f1;
for(String word : words){
outTuple.f0 = word;
outTuple.f1 = timestamp;
out.collect(outTuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
......@@ -22,16 +22,16 @@ import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WordCountLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<String, Integer>> dataStream = env
DataStream<Tuple2<String, Integer>> dataStream = context
.readTextFile("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountSplitter())
.partitionBy(0)
.map(new WordCountCounter())
.addSink(new WordCountSink());
env.execute();
context.execute();
}
}
......@@ -22,6 +22,7 @@ import eu.stratosphere.util.Collector;
public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
......@@ -35,7 +36,8 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
words = inTuple.f0.split(" ");
for (String word : words) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
static kafka.javaapi.producer.Producer<Integer, String> producer;
static Properties props = new Properties();
public static void ProducerPrepare(String brokerAddr) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", brokerAddr);
producer = new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
}
public static void main(String[] args) throws Exception{
if (args.length == 1) {
String infilename=args[0];
String topicId=args[1];
String brokerAddr=args[2];
ProducerPrepare(brokerAddr);
BufferedReader reader = new BufferedReader(new FileReader(infilename));
while (true) {
String line=reader.readLine();
if(line==null){
reader.close();
reader = new BufferedReader(new FileReader(infilename));
continue;
}
producer.send(new KeyedMessage<Integer, String>(
topicId, line));
}
}else{
System.out.println("please set filename!");
System.exit(-1);
}
}
}
......@@ -29,9 +29,8 @@ import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
/**
* Source for reading messages from a Kafka queue. The source currently only
* support string messages. Other types will be added soon.
*
* Source for reading messages from a Kafka queue.
* The source currently only support string messages.
*/
public class KafkaSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
......
......@@ -15,32 +15,33 @@
package eu.stratosphere.streaming.state;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.index.IndexPair;
public class GraphState {
public Map<Integer, Set<Integer>> vertices = null;
public class LogTableStateIterator<K, V> implements TableStateIterator<K ,V>{
private Iterator<Entry<K, IndexPair>> iterator;
private HashMap<Integer, ArrayList<V>> blockList;
public LogTableStateIterator(Iterator<Entry<K, IndexPair>> iter, HashMap<Integer, ArrayList<V>> blocks){
iterator=iter;
blockList=blocks;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
public GraphState() {
vertices = new HashMap<Integer, Set<Integer>>();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
return null;
public void insertDirectedEdge(int sourceNode, int targetNode) {
if (!vertices.containsKey(sourceNode)) {
vertices.put(sourceNode, new HashSet<Integer>());
}
vertices.get(sourceNode).add(targetNode);
}
public void insertUndirectedEdge(int sourceNode, int targetNode){
if(!vertices.containsKey(sourceNode)){
vertices.put(sourceNode, new HashSet<Integer>());
}
if(!vertices.containsKey(targetNode)){
vertices.put(targetNode, new HashSet<Integer>());
}
vertices.get(sourceNode).add(targetNode);
vertices.get(targetNode).add(sourceNode);
}
}
......@@ -15,55 +15,48 @@
package eu.stratosphere.streaming.state;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableTableState<K, V> implements TableState<K, V> {
public class MutableTableState<K, V> implements TableState<K, V>, Serializable {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
state.put(key, value);
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
return state.get(key);
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
state.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return state.containsKey(key);
}
@Override
public MutableTableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
}
......@@ -29,13 +29,11 @@ public class MutableTableStateIterator<K, V> implements TableStateIterator<K, V>
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return iterator.hasNext();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
......
......@@ -15,9 +15,10 @@
package eu.stratosphere.streaming.state;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import java.io.Serializable;
import java.util.ArrayList;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
/**
* The window state for window operator. To be general enough, this class
......@@ -25,7 +26,8 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class SlidingWindowState<K> {
public class SlidingWindowState implements Serializable{
private static final long serialVersionUID = -2376149970115888901L;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
......@@ -41,13 +43,13 @@ public class SlidingWindowState<K> {
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
buffer.add(record);
public void pushBack(ArrayList tupleArray) {
buffer.add(tupleArray);
currentRecordCount += 1;
}
public StreamRecord popFront() {
StreamRecord frontRecord = (StreamRecord) buffer.get();
public ArrayList popFront() {
ArrayList frontRecord = (ArrayList) buffer.get();
buffer.remove();
return frontRecord;
}
......
......@@ -21,12 +21,10 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SlidingWindowStateIterator<K>{
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
public Tuple2<K, StreamRecord> next() {
// TODO Auto-generated method stub
return null;
}
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface TableState<K, V> {
public interface TableState<K, V>{
public void put(K key, V value);
public V get(K key);
public void delete(K key);
......
......@@ -6,7 +6,6 @@ public class MyGeneric2 extends MyGeneric<Tuple1<Integer>> {
@Override
public void asd() {
// TODO Auto-generated method stub
}
......
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.state;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
......@@ -53,37 +52,9 @@ public class InternalStateTest {
}
}
@Test
public void LogTableStateTest(){
LogTableState<String, String> state=new LogTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void WindowStateTest(){
SlidingWindowState<String> state=new SlidingWindowState<String>(100, 20, 10);
SlidingWindowState state=new SlidingWindowState(100, 20, 10);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册