提交 b5dfc948 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] update iterative jobs

上级 c92727b8
......@@ -87,7 +87,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
......
......@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class CollaborativeFilteringLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new CollaborativeFilteringSource());
graphBuilder.setTask("Task", new CollaborativeFilteringTask(), 1, 1);
graphBuilder.setSink("Sink", new CollaborativeFilteringSink());
public static void main(String[] args) {
// TODO Auto-generated method stub
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -15,15 +15,23 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CFSource extends UserSourceInvokable {
public class CollaborativeFilteringSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke() throws Exception {
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple3<Integer, Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/MovieLens100k.data"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] items=line.split("\t");
outRecord.setInteger(0, Integer.valueOf(items[0]));
outRecord.setInteger(1, Integer.valueOf(items[1]));
outRecord.setInteger(2, Integer.valueOf(items[2]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.util.HashMap;
import org.jblas.DoubleMatrix;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
HashMap<Integer, Integer> rowIndex=new HashMap<Integer, Integer>();
HashMap<Integer, Integer> columnIndex=new HashMap<Integer, Integer>();
DoubleMatrix userItem=new DoubleMatrix(1000, 2000);
DoubleMatrix coOccurence=new DoubleMatrix(2000, 2000);
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
int userId = record.getInteger(0, 0);
int itemId = record.getInteger(0, 1);
int rating = record.getInteger(0, 2);
if(!rowIndex.containsKey(userId)){
rowIndex.put(userId, rowIndex.size());
}
if(!columnIndex.containsKey(itemId)){
columnIndex.put(itemId, columnIndex.size());
}
userItem.put(rowIndex.get(userId), columnIndex.get(itemId), rating);
//outRecord.setString(0, line);
}
}
......@@ -15,11 +15,36 @@
package eu.stratosphere.streaming.examples.iterative.kmeans;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class KMeansLocal {
public static JobGraph getJobGraph() {
int numCenter=3;
int dimension=2;
double stddev = 0.08;
double range = 100.0;
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source",new KMeansSource(numCenter, dimension, stddev, range));
graphBuilder.setTask("Task", new KMeansTask(dimension), 1, 1);
graphBuilder.setSink("Sink", new KMeansSink());
public static void main(String[] args) {
// TODO Auto-generated method stub
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -13,17 +13,20 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CFSink extends UserSinkInvokable {
public class KMeansSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
//int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
System.out.println("record=" + record.getString(0, 0));
System.out.println("============================================");
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class KMeansSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_SEED = 4650285087650871364L;
private Random random = new Random(DEFAULT_SEED);
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private int numCenter;
private int dimension;
private double absoluteStdDev;
private double range;
private StringBuilder buffer = new StringBuilder();
public KMeansSource(int numCenter, int dimension, double stddev, double range){
this.numCenter=numCenter;
this.dimension=dimension;
this.absoluteStdDev = stddev * range;
this.range=range;
}
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
double[][] means = uniformRandomCenters(random, numCenter, dimension, range);
double[] point = new double[dimension];
int nextCentroid = 0;
while (true) {
// generate a point for the current centroid
double[] centroid = means[nextCentroid];
for (int d = 0; d < dimension; d++) {
point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
}
nextCentroid = (nextCentroid + 1) % numCenter;
String pointString=generatePointString(point);
outRecord.setString(0, pointString);
emit(outRecord);
}
}
private double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];
for (int i = 0; i < num; i++) {
for (int dim = 0; dim < dimensionality; dim ++) {
points[i][dim] = (rnd.nextDouble() * range) - halfRange;
}
}
return points;
}
private String generatePointString(double[] point){
buffer.setLength(0);
for (int j = 0; j < dimension; j++) {
buffer.append(point[j]);
if(j < dimension - 1) {
buffer.append(" ");
}
}
return buffer.toString();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class KMeansTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private double[] point=null;
public KMeansTask(int dimension){
point = new double[dimension];
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String[] pointStr = record.getString(0, 0).split(" ");
for(int i=0; i<pointStr.length; ++i){
point[i]=Double.valueOf(pointStr[i]);
}
outRecord.setString(0, record.getString(0, 0));
emit(outRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class Graph {
public Map<Integer, Set<Integer>> _vertices = null;
public Graph() {
_vertices = new HashMap<Integer, Set<Integer>>();
}
public void insertDirectedEdge(int sourceNode, int targetNode) {
if (!_vertices.containsKey(sourceNode)) {
_vertices.put(sourceNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
}
public void insertUndirectedEdge(int sourceNode, int targetNode){
if(!_vertices.containsKey(sourceNode)){
_vertices.put(sourceNode, new HashSet<Integer>());
}
if(!_vertices.containsKey(targetNode)){
_vertices.put(targetNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
_vertices.get(targetNode).add(sourceNode);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class PageRankLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new PageRankSource());
graphBuilder.setTask("Task", new PageRankTask(), 1, 1);
graphBuilder.setSink("Sink", new PageRankSink());
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class PageRankSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class PageRankSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private StreamRecord outRecord = new StreamRecord(new Tuple2<Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.setInteger(0, Integer.valueOf(link[0]));
outRecord.setInteger(0, Integer.valueOf(link[1]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
......@@ -15,11 +15,25 @@
package eu.stratosphere.streaming.examples.iterative.pagerank;
public class PagerankLocal {
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public static void main(String[] args) {
// TODO Auto-generated method stub
public class PageRankTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private Graph linkGraph = new Graph();
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class Graph {
public Map<Integer, Set<Integer>> _vertices = null;
public Graph() {
_vertices = new HashMap<Integer, Set<Integer>>();
}
public void insertDirectedEdge(int sourceNode, int targetNode) {
if (!_vertices.containsKey(sourceNode)) {
_vertices.put(sourceNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
}
public void insertUndirectedEdge(int sourceNode, int targetNode){
if(!_vertices.containsKey(sourceNode)){
_vertices.put(sourceNode, new HashSet<Integer>());
}
if(!_vertices.containsKey(targetNode)){
_vertices.put(targetNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
_vertices.get(targetNode).add(sourceNode);
}
}
......@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.sssp;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class SSSPLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new SSSPSource());
graphBuilder.setTask("Task", new SSSPTask(), 1, 1);
graphBuilder.setSink("Sink", new SSSPSink());
public static void main(String[] args) {
// TODO Auto-generated method stub
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SSSPSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SSSPSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private StreamRecord outRecord = new StreamRecord(new Tuple2<Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.setInteger(0, Integer.valueOf(link[0]));
outRecord.setInteger(0, Integer.valueOf(link[1]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
......@@ -13,19 +13,27 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CFTask extends UserTaskInvokable {
public class SSSPTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private Graph linkGraph = new Graph();
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
......@@ -15,16 +15,12 @@
package eu.stratosphere.streaming.examples.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class JoinLocal {
......@@ -46,41 +42,7 @@ public class JoinLocal {
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -15,16 +15,12 @@
package eu.stratosphere.streaming.examples.window.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowJoinLocal {
......@@ -46,41 +42,7 @@ public class WindowJoinLocal {
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -15,15 +15,11 @@
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowSumLocal {
......@@ -45,41 +41,7 @@ public class WindowSumLocal {
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -15,16 +15,12 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowWordCountLocal {
......@@ -46,41 +42,7 @@ public class WindowWordCountLocal {
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -41,17 +41,6 @@ public class WordCountLocal {
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
}
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
public class helloworld {
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册