diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java new file mode 100644 index 0000000000000000000000000000000000000000..bfd3cd7f0333d476a985c3e535a3a30524291ca5 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.sideoutput; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * An example that illustrates the use of side outputs. + * + *

This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount} + * that has a filter in the tokenizer and only emits some words for counting + * while emitting the other words to a side output. + */ +public class SideOutputExample { + + /** + * We need to create an {@link OutputTag} so that we can reference it when emitting + * data to a side output and also to retrieve the side output stream from an operation. + */ + static final OutputTag rejectedWordsTag = new OutputTag("rejected") {}; + + public static void main(String[] args) throws Exception { + + // Checking input parameters + final ParameterTool params = ParameterTool.fromArgs(args); + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(params); + + // get input data + DataStream text; + if (params.has("input")) { + // read the text file from given input path + text = env.readTextFile(params.get("input")); + } else { + System.out.println("Executing WordCount example with default input data set."); + System.out.println("Use --input to specify file input."); + // get default test text data + text = env.fromElements(WordCountData.WORDS); + } + + SingleOutputStreamOperator> tokenized = text + .keyBy(new KeySelector() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(String value) throws Exception { + return 0; + } + }) + .process(new Tokenizer()); + + DataStream rejectedWords = tokenized + .getSideOutput(rejectedWordsTag) + .map(new MapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public String map(String value) throws Exception { + return "rejected: " + value; + } + }); + + DataStream> counts = tokenized + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + // group by the tuple field "0" and sum up tuple field "1" + .sum(1); + + // emit result + if (params.has("output")) { + counts.writeAsText(params.get("output")); + rejectedWords.writeAsText(params.get("rejected-words-output")); + } else { + System.out.println("Printing result to stdout. Use --output to specify output path."); + counts.print(); + rejectedWords.print(); + } + + // execute program + env.execute("Streaming WordCount SideOutput"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2}). + * + *

This rejects words that are longer than 5 characters long. + */ + public static final class Tokenizer extends ProcessFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + String value, + Context ctx, + Collector> out) throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 5) { + ctx.output(rejectedWordsTag, token); + } else if (token.length() > 0) { + out.collect(new Tuple2<>(token, 1)); + } + } + + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java index 48418af4a0160e6a3640985a4c82dc8de829ff1c..43683eff9e2240257e400ac97729de6d6348795f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * A function that processes elements of a stream. @@ -101,6 +102,14 @@ public abstract class ProcessFunction extends AbstractRichFunction { * A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); + + /** + * Emits a record to the side output identified by the {@link OutputTag}. + * + * @param outputTag the {@code OutputTag} that identifies the side output to emit to. + * @param value The record to emit. + */ + public abstract void output(OutputTag outputTag, X value); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java index d8dfb0fa7a58dd76dc4591e486c09fa6cf874bd1..97bc8ce7651b07cfc1f389bd6d49a45ad1689a6b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -38,9 +39,9 @@ public class KeyedProcessOperator private transient TimestampedCollector collector; - private transient ContextImpl context; + private transient ContextImpl context; - private transient OnTimerContextImpl onTimerContext; + private transient OnTimerContextImpl onTimerContext; public KeyedProcessOperator(ProcessFunction function) { super(function); @@ -58,8 +59,8 @@ public class KeyedProcessOperator TimerService timerService = new SimpleTimerService(internalTimerService); - context = new ContextImpl<>(userFunction, timerService); - onTimerContext = new OnTimerContextImpl<>(userFunction, timerService); + context = new ContextImpl(userFunction, timerService); + onTimerContext = new OnTimerContextImpl(userFunction, timerService); } @Override @@ -90,7 +91,7 @@ public class KeyedProcessOperator context.element = null; } - private static class ContextImpl extends ProcessFunction.Context { + private class ContextImpl extends ProcessFunction.Context { private final TimerService timerService; @@ -112,13 +113,18 @@ public class KeyedProcessOperator } } + @Override + public void output(OutputTag outputTag, X value) { + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + @Override public TimerService timerService() { return timerService; } } - private static class OnTimerContextImpl extends ProcessFunction.OnTimerContext{ + private class OnTimerContextImpl extends ProcessFunction.OnTimerContext{ private final TimerService timerService; @@ -143,6 +149,11 @@ public class KeyedProcessOperator return timer.getTimestamp(); } + @Override + public void output(OutputTag outputTag, X value) { + output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp())); + } + @Override public TimerService timerService() { return timerService; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java index 13b68f49225bd1eca756d6719e9d54c65b05be11..f73b610288b6e58bb249ff2b9f0285f6365d70f8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.OutputTag; import static org.apache.flink.util.Preconditions.checkState; @@ -92,6 +93,11 @@ public class ProcessOperator } } + @Override + public void output(OutputTag outputTag, X value) { + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + @Override public long currentProcessingTime() { return processingTimeService.getCurrentProcessingTime(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index 2f92897c57607d4c9e788238462123d6f8540149..f87ee30fffcbfdf2736a37135417af5634c9effe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.OutputTag; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -57,6 +58,151 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase { elements.add(4); } + private final static OutputTag sideOutputTag = new OutputTag("side"){}; + private final static OutputTag otherSideOutputTag = new OutputTag("other-side"){}; + + /** + * Test ProcessFunction side output. + */ + @Test + public void testProcessFunctionSideOutput() throws Exception { + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); + TestListResultSink resultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(3); + + DataStream dataStream = see.fromCollection(elements); + + SingleOutputStreamOperator passThroughtStream = dataStream + .process(new ProcessFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Integer value, Context ctx, Collector out) throws Exception { + out.collect(value); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); + } + }); + + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + passThroughtStream.addSink(resultSink); + see.execute(); + + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); + } + + /** + * Test keyed ProcessFunction side output. + */ + @Test + public void testKeyedProcessFunctionSideOutput() throws Exception { + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); + TestListResultSink resultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(3); + + DataStream dataStream = see.fromCollection(elements); + + SingleOutputStreamOperator passThroughtStream = dataStream + .keyBy(new KeySelector() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .process(new ProcessFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Integer value, Context ctx, Collector out) throws Exception { + out.collect(value); + ctx.output(sideOutputTag, "sideout-" + String.valueOf(value)); + } + }); + + passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + passThroughtStream.addSink(resultSink); + see.execute(); + + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); + } + + + /** + * Test ProcessFunction side outputs with wrong {@code OutputTag}. + */ + @Test + public void testProcessFunctionSideOutputWithWrongTag() throws Exception { + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(3); + + DataStream dataStream = see.fromCollection(elements); + + dataStream + .process(new ProcessFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Integer value, Context ctx, Collector out) throws Exception { + out.collect(value); + ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value)); + } + }).getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + + see.execute(); + + assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult()); + } + + /** + * Test keyed ProcessFunction side outputs with wrong {@code OutputTag}. + */ + @Test + public void testKeyedProcessFunctionSideOutputWithWrongTag() throws Exception { + TestListResultSink sideOutputResultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(3); + + DataStream dataStream = see.fromCollection(elements); + + dataStream + .keyBy(new KeySelector() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .process(new ProcessFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Integer value, Context ctx, Collector out) throws Exception { + out.collect(value); + ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value)); + } + }).getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + + see.execute(); + + assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult()); + } + + private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks { private static final long serialVersionUID = 1L;