提交 0b721125 编写于 作者: E eszesd 提交者: Stephan Ewen

[streaming] WindowJoin Example refactored

上级 f74cb0a9
......@@ -19,28 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
public class WindowJoinSourceTwo implements SourceFunction<Tuple4<String, String, Integer, Long>> {
public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
outTuple.f3 = progress;
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(5) + 1;
outTuple.f2 = progress;
out.collect(outTuple);
progress += 1;
}
......
......@@ -19,28 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
public class WindowJoinSourceOne implements SourceFunction<Tuple4<String, String, Integer, Long>> {
public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
outTuple.f3 = progress;
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(10000);
outTuple.f2 = progress;
out.collect(outTuple);
progress += 1;
}
......
......@@ -18,11 +18,8 @@
package org.apache.flink.streaming.examples.window.join;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.LogUtils;
import org.apache.log4j.Level;
public class WindowJoinLocal {
......@@ -32,23 +29,22 @@ public class WindowJoinLocal {
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
//TODO update and reconsider
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
PARALLELISM).setBufferTimeout(100);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM).setBufferTimeout(100);
DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
SOURCE_PARALLELISM);
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALLELISM);
DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
SOURCE_PARALLELISM);
@SuppressWarnings("unchecked")
DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALLELISM).merge(dataStream1)
.partitionBy(1).flatMap(new WindowJoinTask());
DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
.flatMap(new WindowJoinTask());
dataStream2.print();
System.out.println("(NAME, GRADE, SALARY)");
joinedStream.print();
env.execute();
......
......@@ -21,13 +21,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class WindowJoinTask extends
RichFlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
class SalaryProgress {
public SalaryProgress(Integer salary, Long progress) {
......@@ -53,60 +52,75 @@ public class WindowJoinTask extends
private int windowSize = 100;
private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
private String name;
private Long progress;
public WindowJoinTask() {
gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
name = new String();
progress = 0L;
}
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
// Joins the input value (grade) with the already known values (salaries) on
// a given interval.
// Also stores the new element.
@Override
public void flatMap(Tuple4<String, String, Integer, Long> value,
public void flatMap1(Tuple3<String, Integer, Long> value,
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
String streamId = value.f0;
String name = value.f1;
Long progress = value.f3;
name = value.f0;
progress = value.f2;
outputTuple.f0 = name;
outputTuple.f1 = value.f1;
// Joins the input value with the already known values on a given interval. If it is a grade
// then with the salaries, if it is a salary then with the grades. Also
// stores the new element.
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, value.f2, entry.salary);
out.collect(outputTuple);
}
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
outputTuple.f2 = entry.salary;
out.collect(outputTuple);
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
gradeHashmap.get(name).add(new GradeProgress(value.f2, progress));
} else {
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, entry.grade, value.f2);
out.collect(outputTuple);
}
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
}
}
}
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
// Joins the input value (salary) with the already known values (grades) on
// a given interval.
// Also stores the new element.
@Override
public void flatMap2(Tuple3<String, Integer, Long> value,
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
name = value.f0;
progress = value.f2;
outputTuple.f0 = name;
outputTuple.f2 = value.f1;
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
outputTuple.f1 = entry.grade;
out.collect(outputTuple);
}
salaryHashmap.get(name).add(new SalaryProgress(value.f2, progress));
}
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
}
salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册