提交 44c378ae 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] stream-join modified for new api

上级 a7702c09
......@@ -15,72 +15,31 @@
package eu.stratosphere.streaming.examples.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.util.LogUtils;
public class JoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("JoinSourceOne", JoinSourceOne.class);
graphBuilder.setSource("JoinSourceTwo", JoinSourceTwo.class);
graphBuilder.setTask("JoinTask", JoinTask.class, 1, 1);
graphBuilder.setSink("JoinSink", JoinSink.class);
graphBuilder.fieldsConnect("JoinSourceOne", "JoinTask", 1);
graphBuilder.fieldsConnect("JoinSourceTwo", "JoinTask", 1);
graphBuilder.shuffleConnect("JoinTask", "JoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
exec.start();
DataStream<Tuple3<String, String, Integer>> source1 = context
.addSource(new JoinSourceOne()).partitionBy(1);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = context
.addSource(new JoinSourceTwo()).partitionBy(1).connectWith(source1)
.flatMap(new JoinTask()).addSink(new JoinSink());
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
context.execute();
}
}
......@@ -15,22 +15,19 @@
package eu.stratosphere.streaming.examples.join;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class JoinSink extends UserSinkInvokable {
public class JoinSink extends SinkFunction<Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
public void invoke(Tuple3<String, Integer, Integer> tuple) {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("name=" + tuple.f0 + ", grade=" + tuple.f1 + ", salary=" + tuple.f2);
System.out.println("============================================");
}
}
......@@ -18,27 +18,26 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class JoinSourceOne extends UserSourceInvokable {
public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer>> {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, Integer>());
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
emit(outRecord);
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
collector.collect(outRecord);
}
}
}
......@@ -18,27 +18,26 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class JoinSourceTwo extends UserSourceInvokable {
public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer>> {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, String>());
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
emit(outRecord);
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
collector.collect(outRecord);
}
}
}
......@@ -18,55 +18,52 @@ package eu.stratosphere.streaming.examples.join;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class JoinTask extends UserTaskInvokable {
public class JoinTask extends
FlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 749913336259789039L;
private HashMap<String, ArrayList<String>> gradeHashmap;
private HashMap<String, ArrayList<Integer>> gradeHashmap;
private HashMap<String, ArrayList<Integer>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public JoinTask() {
gradeHashmap = new HashMap<String, ArrayList<String>>();
gradeHashmap = new HashMap<String, ArrayList<Integer>>();
salaryHashmap = new HashMap<String, ArrayList<Integer>>();
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
public void flatMap(Tuple3<String, String, Integer> value,
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
String streamId = value.f0;
String name = value.f1;
;
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
for (Integer salary : salaryHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), salary);
outRecord.addTuple(outputTuple);
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, value.f2, salary);
out.collect(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new ArrayList<String>());
gradeHashmap.put(name, new ArrayList<Integer>());
}
gradeHashmap.get(name).add(record.getString(2));
gradeHashmap.get(name).add(value.f2);
} else {
if (gradeHashmap.containsKey(name)) {
for (String grade : gradeHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
for (Integer grade : gradeHashmap.get(name)) {
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, grade, value.f2);
out.collect(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new ArrayList<Integer>());
}
salaryHashmap.get(name).add(record.getInteger(2));
salaryHashmap.get(name).add(value.f2);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册