提交 271071ad 编写于 作者: S Stephan Ewen

[FLINK-3550] [examples] Rework WindowJoin to properly demonstrate continuous...

[FLINK-3550] [examples] Rework WindowJoin to properly demonstrate continuous window joins (Java + Scala)
上级 434e88fd
......@@ -182,7 +182,7 @@ under the License.
</archive>
<includes>
<include>org/apache/flink/streaming/examples/iteration/*.class</include>
<include>org/apache/flink/streaming/examples/iteration/*.class</include>
</includes>
</configuration>
</execution>
......@@ -250,7 +250,7 @@ under the License.
</archive>
<includes>
<include>org/apache/flink/streaming/examples/join/*.class</include>
<include>org/apache/flink/streaming/examples/join/*.class</include>
</includes>
</configuration>
</execution>
......@@ -274,7 +274,7 @@ under the License.
<includes>
<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>
......
......@@ -18,38 +18,27 @@
package org.apache.flink.streaming.examples.join;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
/**
* Example illustrating join over sliding windows of streams in Flink.
* Example illustrating a windowed stream join between two data streams.
*
* <p>The example works on two input streams with pairs (name, grade) and (name, salary)
* respectively. It joins the steams based on "name" within a configurable window.
*
* <p>
* This example will join two streams with a sliding window. One which emits grades and one which
* emits salaries of people. The input format for both sources has an additional timestamp
* as field 0. This is used to to event-time windowing. Time timestamps must be
* monotonically increasing.
*
* This example shows how to:
* <ul>
* <li>do windowed joins,
* <li>use tuple data types,
* <li>write a simple streaming program.
* </ul>
* <p>The example uses a built-in sample data generator that generates
* the steams of pairs at a configurable rate.
*/
@SuppressWarnings("serial")
public class WindowJoin {
......@@ -59,200 +48,62 @@ public class WindowJoin {
// *************************************************************************
public static void main(String[] args) throws Exception {
// Checking input parameters
// parse the parameters
final ParameterTool params = ParameterTool.fromArgs(args);
System.out.println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>");
final long windowSize = params.getLong("windowSize", 2000);
final long rate = params.getLong("rate", 3L);
System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate);
System.out.println("To customize example, use: WindowJoin [--windowSize <window-size-in-millis>] [--rate <elements-per-second>]");
// obtain execution environment
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data sources for grades and salaries
DataStream<Tuple3<Long, String, Integer>> grades = getGradesPath(env, params);
DataStream<Tuple3<Long, String, Integer>> salaries = getSalariesPath(env, params);
// create the data sources for both grades and salaries
DataStream<Tuple2<String, Integer>> grades = GradeSource.getSource(env, rate);
DataStream<Tuple2<String, Integer>> salaries = SalarySource.getSource(env, rate);
// run the actual window join program
// for testability, this functionality is in a separate method.
DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(grades, salaries, windowSize);
// extract the timestamps
grades = grades.assignTimestampsAndWatermarks(new MyTimestampExtractor());
salaries = salaries.assignTimestampsAndWatermarks(new MyTimestampExtractor());
// apply a temporal join over the two stream based on the names over one
// second windows
DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
.apply(new MyJoinFunction());
// emit result
if (params.has("output")) {
joinedStream.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
joinedStream.print();
}
// print the results with a single thread, rather than in parallel
joinedStream.print().setParallelism(1);
// execute program
env.execute("Windowed Join Example");
}
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
private final static int GRADE_COUNT = 5;
private final static int SALARY_MAX = 10000;
private final static int SLEEP_TIME = 10;
/**
* Continuously emit tuples with random names and integers (grades).
*/
public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
private Random rand;
private Tuple3<Long, String, Integer> outTuple;
private volatile boolean isRunning = true;
private int counter;
public GradeSource() {
rand = new Random();
outTuple = new Tuple3<>();
}
@Override
public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
while (isRunning && counter < 100) {
outTuple.f0 = System.currentTimeMillis();
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* Continuously emit tuples with random names and integers (salaries).
*/
public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
private transient Random rand;
private transient Tuple3<Long, String, Integer> outTuple;
private volatile boolean isRunning;
private int counter;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
rand = new Random();
outTuple = new Tuple3<Long, String, Integer>();
isRunning = true;
}
@Override
public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
while (isRunning && counter < 100) {
outTuple.f0 = System.currentTimeMillis();
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
private String[] record;
public MySourceMap() {
record = new String[2];
}
@Override
public Tuple3<Long, String, Integer> map(String line) throws Exception {
record = line.substring(1, line.length() - 1).split(",");
return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
}
}
public static class MyJoinFunction
implements
JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
@Override
public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
Tuple3<Long, String, Integer> second) throws Exception {
joined.f0 = first.f1;
joined.f1 = first.f2;
joined.f2 = second.f2;
return joined;
}
}
private static class MyTimestampExtractor extends AscendingTimestampExtractor<Tuple3<Long, String, Integer>> {
@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first,
Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
}
});
}
private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
private static final long serialVersionUID = 1L;
private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
return value.f1;
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static DataStream<Tuple3<Long, String, Integer>> getGradesPath(StreamExecutionEnvironment env, ParameterTool params) {
if (params.has("grades")) {
return env.readTextFile(params.get("grades")).map(new MySourceMap());
} else {
System.out.println("Executing WindowJoin example with default grades data set.");
System.out.println("Use --grades to specify file input.");
return env.addSource(new GradeSource());
}
}
private static DataStream<Tuple3<Long, String, Integer>> getSalariesPath(StreamExecutionEnvironment env, ParameterTool params) {
if (params.has("salaries")) {
return env.readTextFile(params.get("salaries")).map(new MySourceMap());
} else {
System.out.println("Executing WindowJoin example with default salaries data set.");
System.out.println("Use --salaries to specify file input.");
return env.addSource(new SalarySource());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.join;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.utils.ThrottledIterator;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
/**
* Sample data for the {@link WindowJoin} example.
*/
@SuppressWarnings("serial")
public class WindowJoinSampleData {
static final String[] NAMES = {"tom", "jerry", "alice", "bob", "john", "grace"};
static final int GRADE_COUNT = 5;
static final int SALARY_MAX = 10000;
/**
* Continuously generates (name, grade).
*/
public static class GradeSource implements Iterator<Tuple2<String, Integer>>, Serializable {
private final Random rnd = new Random(hashCode());
@Override
public boolean hasNext() {
return true;
}
@Override
public Tuple2<String, Integer> next() {
return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new GradeSource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
}
}
/**
* Continuously generates (name, salary).
*/
public static class SalarySource implements Iterator<Tuple2<String, Integer>>, Serializable {
private final Random rnd = new Random(hashCode());
@Override
public boolean hasNext() {
return true;
}
@Override
public Tuple2<String, Integer> next() {
return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new SalarySource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.join.util;
public class WindowJoinData {
public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
"(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
"(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
"(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
"(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
"(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
"(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
"(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
"(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
"(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
"(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
"(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
"(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
"(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
"(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
"(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
"(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
"(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
"(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
"(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
"(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
"(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
"(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
"(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
"(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
"(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
"(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
"(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
"(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
"(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
"(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
"(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
"(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
"(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
private WindowJoinData() {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.utils;
import java.io.Serializable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;
/**
* A variant of the collection source (emits a sequence of elements as a stream)
* that supports throttling the emission rate.
* @param <T>
*/
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
private static final long serialVersionUID = 1L;
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Iterator<T> source;
private final long sleepBatchSize;
private final long sleepBatchTime;
private long lastBatchCheckTime;
private long num;
public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
this.source = requireNonNull(source);
if (!(source instanceof Serializable)) {
throw new IllegalArgumentException("source must be java.io.Serializable");
}
if (elementsPerSecond >= 100) {
// how many elements would we emit per 50ms
this.sleepBatchSize = elementsPerSecond / 20;
this.sleepBatchTime = 50;
}
else if (elementsPerSecond >= 1) {
// how long does element take
this.sleepBatchSize = 1;
this.sleepBatchTime = 1000 / elementsPerSecond;
}
else {
throw new IllegalArgumentException("'elements per second' must be positive and not zero");
}
}
@Override
public boolean hasNext() {
return source.hasNext();
}
@Override
public T next() {
// delay if necessary
if (lastBatchCheckTime > 0) {
if (++num >= sleepBatchSize) {
num = 0;
final long now = System.currentTimeMillis();
final long elapsed = now - lastBatchCheckTime;
if (elapsed < sleepBatchTime) {
try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
// restore interrupt flag and proceed
Thread.currentThread().interrupt();
}
}
lastBatchCheckTime = now;
}
} else {
lastBatchCheckTime = System.currentTimeMillis();
}
return source.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
......@@ -18,112 +18,79 @@
package org.apache.flink.streaming.scala.examples.join
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import scala.Stream._
import scala.language.postfixOps
import scala.util.Random
/**
* Example illustrating a windowed stream join between two data streams.
*
* The example works on two input streams with pairs (name, grade) and (name, salary)
* respectively. It joins the steams based on "name" within a configurable window.
*
* The example uses a built-in sample data generator that generates
* the steams of pairs at a configurable rate.
*/
object WindowJoin {
// *************************************************************************
// PROGRAM
// Program Data Types
// *************************************************************************
case class Grade(time: Long, name: String, grade: Int)
case class Salary(time: Long, name: String, salary: Int)
case class Grade(name: String, grade: Int)
case class Salary(name: String, salary: Int)
case class Person(name: String, grade: Int, salary: Int)
def main(args: Array[String]) {
// *************************************************************************
// Program
// *************************************************************************
def main(args: Array[String]) {
// parse the parameters
val params = ParameterTool.fromArgs(args)
println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>")
val windowSize = params.getLong("windowSize", 2000)
val rate = params.getLong("rate", 3)
println("Using windowSize=" + windowSize + ", data rate=" + rate)
println("To customize example, use: WindowJoin " +
"[--windowSize <window-size-in-millis>] [--rate <elements-per-second>]")
// obtain execution environment, run this example in "ingestion time"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// Create streams for grades and salaries by mapping the inputs to the corresponding objects
val grades = setGradesDataStream(env, params)
val salaries = setSalariesDataStream(env, params)
// // create the data sources for both grades and salaries
val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)
//Join the two input streams by name on the last 2 seconds every second and create new
//Person objects containing both grade and salary
val joined = grades.join(salaries)
.where(_.name)
.equalTo(_.name)
.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.apply { (g, s) => Person(g.name, g.grade, s.salary) }
// join the two input streams by name on a window.
// for testability, this functionality is in a separate method.
val joined = joinStreams(grades, salaries, windowSize)
if (params.has("output")) {
joined.writeAsText(params.get("output"))
} else {
println("Printing result to stdout. Use --output to specify output path.")
joined.print()
}
// print the results with a single thread, rather than in parallel
joined.print().setParallelism(1)
// execute program
env.execute("WindowJoin")
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
val gradeCount = 5
val salaryMax = 10000
val sleepInterval = 100
def gradeStream: Stream[(Long, String, Int)] = {
def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
{
if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
(System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
}
range(1, 100).map(gradeMapper(names))
}
def salaryStream: Stream[(Long, String, Int)] = {
def salaryMapper(x: Int): (Long, String, Int) =
{
if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
(System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
}
range(1, 100).map(salaryMapper)
}
def parseMap(line : String): (Long, String, Int) = {
val record = line.substring(1, line.length - 1).split(",")
(record(0).toLong, record(1), record(2).toInt)
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private def setGradesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
DataStream[Grade] = {
if (params.has("grades")) {
env.readTextFile(params.get("grades")).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
} else {
println("Executing WindowJoin example with default grades data set.")
println("Use --grades to specify file input.")
env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
}
}
private def setSalariesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
DataStream[Salary] = {
if (params.has("salaries")) {
env.readTextFile(params.get("salaries")).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
} else {
println("Executing WindowJoin example with default salaries data set.")
println("Use --salaries to specify file input.")
env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
}
def joinStreams(
grades: DataStream[Grade],
salaries: DataStream[Salary],
windowSize: Long) : DataStream[Person] = {
grades.join(salaries)
.where(_.name)
.equalTo(_.name)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply { (g, s) => Person(g.name, g.grade, s.salary) }
}
}
......@@ -16,38 +16,61 @@
* limitations under the License.
*/
package org.apache.flink.streaming.test.exampleScalaPrograms.join;
import org.apache.flink.streaming.scala.examples.join.WindowJoin;
import org.apache.flink.streaming.examples.join.util.WindowJoinData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class WindowJoinITCase extends StreamingProgramTestBase {
protected String gradesPath;
protected String salariesPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
resultPath = getTempDirPath("result");
}
@Override
protected void postSubmit() throws Exception {
// since the two sides of the join might have different speed
// the exact output can not be checked just whether it is well-formed
// checks that the result lines look like e.g. Person(bob, 2, 2015)
checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
}
@Override
protected void testProgram() throws Exception {
WindowJoin.main(new String[]{
"--grades", gradesPath,
"--salaries", salariesPath,
"--output", resultPath});
}
package org.apache.flink.streaming.scala.examples.join
import java.io.Serializable
import java.util.Random
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.utils.ThrottledIterator
import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
import scala.collection.JavaConverters._
/**
* Sample data for the [[WindowJoin]] example.
*/
object WindowJoinSampleData {
private[join] val NAMES = Array("tom", "jerry", "alice", "bob", "john", "grace")
private[join] val GRADE_COUNT = 5
private[join] val SALARY_MAX = 10000
/**
* Continuously generates (name, grade).
*/
def getGradeSource(env: StreamExecutionEnvironment, rate: Long): DataStream[Grade] = {
env.fromCollection(new ThrottledIterator(new GradeSource().asJava, rate).asScala)
}
/**
* Continuously generates (name, salary).
*/
def getSalarySource(env: StreamExecutionEnvironment, rate: Long): DataStream[Salary] = {
env.fromCollection(new ThrottledIterator(new SalarySource().asJava, rate).asScala)
}
// --------------------------------------------------------------------------
class GradeSource extends Iterator[Grade] with Serializable {
private[this] val rnd = new Random(hashCode())
def hasNext: Boolean = true
def next: Grade = {
Grade(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(GRADE_COUNT) + 1)
}
}
class SalarySource extends Iterator[Salary] with Serializable {
private[this] val rnd = new Random(hashCode())
def hasNext: Boolean = true
def next: Salary = {
Salary(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(SALARY_MAX) + 1)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.test.exampleJavaPrograms.join;
/**
* Class with sample data for window join examples.
*/
public class WindowJoinData {
public static final String GRADES_INPUT = "0,john,5\n" + "0,tom,3\n" + "0,alice,1\n" + "0,grace,5\n" +
"1,john,4\n" + "1,bob,1\n" + "1,alice,2\n" + "1,alice,3\n" + "1,bob,5\n" + "1,alice,3\n" + "1,tom,5\n" +
"2,john,2\n" + "2,john,1\n" + "2,grace,2\n" + "2,jerry,2\n" + "2,tom,4\n" + "2,bob,4\n" + "2,bob,2\n" +
"3, tom,2\n" + "3,alice,5\n" + "3,grace,5\n" + "3,grace,1\n" + "3,alice,1\n" + "3,grace,3\n" + "3,tom,1\n" +
"4,jerry,5\n" + "4,john,3\n" + "4,john,4\n" + "4,john,1\n" + "4,jerry,3\n" + "4,grace,3\n" + "4,bob,3\n" +
"5,john,3\n" + "5,jerry,4\n" + "5,tom,5\n" + "5,tom,4\n" + "5,john,2\n" + "5,jerry,1\n" + "5,bob,1\n" +
"6,john,5\n" + "6,grace,4\n" + "6,tom,5\n" + "6,john,4\n" + "6,tom,1\n" + "6,grace,1\n" + "6,john,2\n" +
"7,jerry,3\n" + "7,jerry,5\n" + "7,tom,2\n" + "7,tom,2\n" + "7,alice,4\n" + "7,tom,4\n" + "7,jerry,4\n" +
"8,john,3\n" + "8,grace,4\n" + "8,tom,3\n" + "8,jerry,4\n" + "8,john,5\n" + "8,john,4\n" + "8,jerry,1\n" +
"9,john,5\n" + "9,alice,2\n" + "9,tom,1\n" + "9,alice,5\n" + "9,grace,4\n" + "9,bob,4\n" + "9,jerry,1\n" +
"10,john,5\n" + "10,tom,4\n" + "10,tom,5\n" + "10,jerry,5\n" + "10,tom,1\n" + "10,grace,3\n" + "10,bob,5\n" +
"11,john,1\n" + "11,alice,1\n" + "11,grace,3\n" + "11,grace,1\n" + "11,jerry,1\n" + "11,jerry,4\n" +
"12,bob,4\n" + "12,alice,3\n" + "12,tom,5\n" + "12,alice,4\n" + "12,alice,4\n" + "12,grace,4\n" + "12,john,5\n" +
"13,john,5\n" + "13,grace,4\n" + "13,tom,4\n" + "13,john,4\n" + "13,john,5\n" + "13,alice,5\n" + "13,jerry,5\n" +
"14,john,3\n" + "14,tom,5\n" + "14,jerry,4\n" + "14,grace,4\n" + "14,john,3\n" + "14,bob,2";
public static final String SALARIES_INPUT = "0,john,6469\n" + "0,jerry,6760\n" + "0,jerry,8069\n" +
"1,tom,3662\n" + "1,grace,8427\n" + "1,john,9425\n" + "1,bob,9018\n" + "1,john,352\n" + "1,tom,3770\n" +
"2,grace,7622\n" + "2,jerry,7441\n" + "2,alice,1468\n" + "2,bob,5472\n" + "2,grace,898\n" +
"3,tom,3849\n" + "3,grace,1865\n" + "3,alice,5582\n" + "3,john,9511\n" + "3,alice,1541\n" +
"4,john,2477\n" + "4,grace,3561\n" + "4,john,1670\n" + "4,grace,7290\n" + "4,grace,6565\n" +
"5,tom,6179\n" + "5,tom,1601\n" + "5,john,2940\n" + "5,bob,4685\n" + "5,bob,710\n" + "5,bob,5936\n" +
"6,jerry,1412\n" + "6,grace,6515\n" + "6,grace,3321\n" + "6,tom,8088\n" + "6,john,2876\n" +
"7,bob,9896\n" + "7,grace,7368\n" + "7,grace,9749\n" + "7,bob,2048\n" + "7,alice,4782\n" +
"8,alice,3375\n" + "8,tom,5841\n" + "8,bob,958\n" + "8,bob,5258\n" + "8,tom,3935\n" + "8,jerry,4394\n" +
"9,alice,102\n" + "9,alice,4931\n" + "9,alice,5240\n" + "9,jerry,7951\n" + "9,john,5675\n" +
"10,bob,609\n" + "10,alice,5997\n" + "10,jerry,9651\n" + "10,alice,1328\n" + "10,bob,1022\n" +
"11,grace,2578\n" + "11,jerry,9704\n" + "11,tom,4476\n" + "11,grace,3784\n" + "11,alice,6144\n" +
"12,bob,6213\n" + "12,alice,7525\n" + "12,jerry,2908\n" + "12,grace,8464\n" + "12,jerry,9920\n" +
"13,bob,3720\n" + "13,bob,7612\n" + "13,alice,7211\n" + "13,jerry,6484\n" + "13,alice,1711\n" +
"14,jerry,5994\n" + "14,grace,928\n" + "14,jerry,2492\n" + "14,grace,9080\n" + "14,tom,4330\n" +
"15,bob,8302\n" + "15,john,4981\n" + "15,tom,1781\n" + "15,grace,1379\n" + "15,jerry,3700\n" +
"16,jerry,3584\n" + "16,jerry,2038\n" + "16,jerry,3902\n" + "16,tom,1336\n" + "16,jerry,7500\n" +
"17,tom,3648\n" + "17,alice,2533\n" + "17,tom,8685\n" + "17,bob,3968\n" + "17,tom,3241\n" + "17,bob,7461\n" +
"18,jerry,2138\n" + "18,alice,7503\n" + "18,alice,6424\n" + "18,tom,140\n" + "18,john,9802\n" +
"19,grace,2977\n" + "19,grace,889\n" + "19,john,1338";
/** Utility class, should not be instantiated */
private WindowJoinData() {}
}
......@@ -18,36 +18,64 @@
package org.apache.flink.streaming.test.exampleJavaPrograms.join;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.join.WindowJoin;
import org.apache.flink.streaming.examples.join.util.WindowJoinData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
public class WindowJoinITCase extends StreamingProgramTestBase {
import org.junit.Test;
protected String gradesPath;
protected String salariesPath;
protected String resultPath;
import java.io.File;
@Override
protected void preSubmit() throws Exception {
gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
resultPath = getTempDirPath("result");
}
@SuppressWarnings("serial")
public class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
@Test
public void testProgram() throws Exception {
final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> grades = env
.fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
.map(new Parser());
DataStream<Tuple2<String, Integer>> salaries = env
.fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
.map(new Parser());
WindowJoin
.runWindowJoin(grades, salaries, 100)
.writeAsText(resultPath, WriteMode.OVERWRITE);
env.execute();
@Override
protected void postSubmit() throws Exception {
// since the two sides of the join might have different speed
// the exact output can not be checked just whether it is well-formed
// checks that the result lines look like e.g. (bob, 2, 2015)
checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
// since the two sides of the join might have different speed
// the exact output can not be checked just whether it is well-formed
// checks that the result lines look like e.g. (bob, 2, 2015)
checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
}
finally {
try {
FileUtils.deleteDirectory(new File(resultPath));
} catch (Throwable ignored) {}
}
}
//-------------------------------------------------------------------------
public static final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
@Override
protected void testProgram() throws Exception {
WindowJoin.main(new String[]{
"--grades", gradesPath,
"--salaries", salariesPath,
"--output", resultPath});
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.scala.examples
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.flink.streaming.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.scala.examples.join.WindowJoin
import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Person, Salary}
import org.apache.flink.streaming.test.exampleJavaPrograms.join.WindowJoinData
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
@Test
def testProgram(): Unit = {
val resultPath: String = File.createTempFile("result-path", "dir").toURI().toString()
try {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val grades: DataStream[Grade] = env
.fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
.map( line => {
val fields = line.split(",")
Grade(fields(1), fields(2).toInt)
})
val salaries: DataStream[Salary] = env
.fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
.map( line => {
val fields = line.split(",")
Salary(fields(1), fields(2).toInt)
})
WindowJoin.joinStreams(grades, salaries, 100)
.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
}
finally {
try {
FileUtils.deleteDirectory(new File(resultPath))
}
catch {
case _ : Throwable =>
}
}
}
}
......@@ -29,7 +29,7 @@ import scala.util.Try
class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
@Test
def testTryTypeEquality: Unit = {
def testTryTypeEquality(): Unit = {
val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
val TryTypeInfo2 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
......@@ -38,18 +38,20 @@ class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
}
@Test
def testTryTypeInequality: Unit = {
def testTryTypeInequality(): Unit = {
val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
val TryTypeInfo2 = new TryTypeInfo[String, Try[String]](BasicTypeInfo.STRING_TYPE_INFO)
//noinspection ComparingUnrelatedTypes
assert(!TryTypeInfo1.equals(TryTypeInfo2))
}
@Test
def testTryTypeInequalityWithDifferentType: Unit = {
def testTryTypeInequalityWithDifferentType(): Unit = {
val TryTypeInfo = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
//noinspection ComparingUnrelatedTypes
assert(!TryTypeInfo.equals(genericTypeInfo))
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册