From dba2946f465a72f18b6452e7ab34f9198b71a908 Mon Sep 17 00:00:00 2001 From: szape Date: Fri, 19 Jun 2015 10:43:10 +0200 Subject: [PATCH] [FLINK-2243] [storm-compat] Demonstrating finite Storm spout functionality on exclamation example -minor renaming -improving JavaDocs Closes #853 --- docs/apis/storm_compatibility.md | 51 ++++++++++ .../excamation/ExclamationTopology.java | 47 ++++++--- ...ion.java => ExclamationWithStormBolt.java} | 26 ++++- ...on.java => ExclamationWithStormSpout.java} | 42 ++++++-- .../excamation/StormExclamationLocal.java | 25 ++++- .../StormExclamationRemoteByClient.java | 22 +++++ .../StormExclamationRemoteBySubmitter.java | 21 ++++ .../util/FiniteStormFileSpout.java | 97 +++++++++++++++++++ ...ter.java => FiniteStormInMemorySpout.java} | 28 ++++-- .../util/OutputFormatter.java | 11 ++- .../util/SimpleOutputFormatter.java | 15 ++- .../wordcount/BoltTokenizerWordCount.java | 2 +- .../wordcount/SpoutSourceWordCount.java | 4 +- .../wordcount/StormWordCountLocal.java | 2 +- .../StormWordCountRemoteByClient.java | 2 +- .../StormWordCountRemoteBySubmitter.java | 2 +- .../wordcount/WordCountTopology.java | 2 +- ...va => ExclamationWithStormBoltITCase.java} | 6 +- ...a => ExclamationWithStormSpoutITCase.java} | 6 +- 19 files changed, 358 insertions(+), 53 deletions(-) rename flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/{StormBoltExclamation.java => ExclamationWithStormBolt.java} (78%) rename flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/{StormSpoutExclamation.java => ExclamationWithStormSpout.java} (69%) create mode 100644 flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java rename flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/{RawOutputFormatter.java => FiniteStormInMemorySpout.java} (55%) rename flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/{StormSpoutExclamationITCase.java => ExclamationWithStormBoltITCase.java} (86%) rename flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/{StormBoltExclamationITCase.java => ExclamationWithStormSpoutITCase.java} (86%) diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md index b8fe66ebc2a..1390b923586 100644 --- a/docs/apis/storm_compatibility.md +++ b/docs/apis/storm_compatibility.md @@ -167,6 +167,57 @@ The input type is `Tuple1` and `Fields("sentence")` specify that `input. See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples. +# Flink Extensions + +## Finite Storm Spouts + +In Flink streaming, sources can be finite - i.e. emit a finite number of records and stop after emitting the last record -, however, Storm spouts always emit infinite streams. +The bridge between the two approach is the `FiniteStormSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition. +The user can create a finite Storm spout by implementing this interface instead of `IRichSpout`, and implementing the `reachedEnd()`method in addition. +When used as part of a Flink topology, a `FiniteStormSpout` should be wrapped in a `FiniteStormSpoutWrapper` class. + +Although finite Storm spouts are not necessary to embed Storm spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy: + + * to achieve that a native Storm spout behaves the same way as a finite Flink source with minimal modifications + * the user wants to process a stream only for some time; after that, the spout can stop automatically + * reading a file into a stream + * for testing purposes + +A `FiniteStormSpout` can be still used as a normal, infinite Storm spout by changing its wrapper class to `StormSpoutWraper` in the Flink topology. + +An example of a finite Storm spout that emits records for 10 seconds only: +
+
+~~~java +public class TimedFiniteStormSpout extends AbstractStormSpout implements FiniteStormSpout { + [...] + private long starttime = System.currentTimeMillis(); + + public boolean reachedEnd() { + return System.currentTimeMillis() - starttime > 10000l; + } + [...] +} +~~~ +
+
+ +Using a `FiniteStormSpout` in a Flink topology: +
+
+~~~java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +DataStream rawInput = env.addSource( + new FiniteStormSpoutWrapper(new TimedFiniteStormSpout(), true) + TypeExtractor.getForClass(String.class)); + +// process data stream +[...] +~~~ +
+
+ # Storm Compatibility Examples You can find more examples in Maven module `flink-storm-compatibilty-examples`. diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java index a5bb5711d31..b7c98a8e1c7 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java @@ -20,15 +20,32 @@ package org.apache.flink.stormcompatibility.excamation; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt; +import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout; +import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout; import org.apache.flink.stormcompatibility.util.OutputFormatter; -import org.apache.flink.stormcompatibility.util.RawOutputFormatter; +import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter; import org.apache.flink.stormcompatibility.util.StormBoltFileSink; import org.apache.flink.stormcompatibility.util.StormBoltPrintSink; -import org.apache.flink.stormcompatibility.util.StormFileSpout; -import org.apache.flink.stormcompatibility.util.StormInMemorySpout; /** - * This is a basic example of a Storm topology. + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}. + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path> + * <result path>
+ * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

*/ public class ExclamationTopology { @@ -36,7 +53,7 @@ public class ExclamationTopology { public final static String firstBoltId = "exclamation1"; public final static String secondBoltId = "exclamation2"; public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new RawOutputFormatter(); + private final static OutputFormatter formatter = new SimpleOutputFormatter(); public static FlinkTopologyBuilder buildTopology() { final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); @@ -46,9 +63,9 @@ public class ExclamationTopology { // read the text file from given input path final String[] tokens = textPath.split(":"); final String inputFile = tokens[tokens.length - 1]; - builder.setSpout(spoutId, new StormFileSpout(inputFile)); + builder.setSpout(spoutId, new FiniteStormFileSpout(inputFile)); } else { - builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS)); + builder.setSpout(spoutId, new FiniteStormInMemorySpout(WordCountData.WORDS)); } builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId); @@ -59,9 +76,11 @@ public class ExclamationTopology { // read the text file from given input path final String[] tokens = outputPath.split(":"); final String outputFile = tokens[tokens.length - 1]; - builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(secondBoltId); + builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)) + .shuffleGrouping(secondBoltId); } else { - builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(secondBoltId); + builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4) + .shuffleGrouping(secondBoltId); } return builder; @@ -84,13 +103,17 @@ public class ExclamationTopology { textPath = args[0]; outputPath = args[1]; } else { - System.err.println("Usage: StormExclamation* "); + System.err.println( + "Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] "); return false; } } else { - System.out.println("Executing StormExclamation* example with built-in default data"); + System.out.println("Executing StormExclamation example with built-in default data"); System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: StormExclamation* "); + System.out.println( + " Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] " + + " "); } return true; diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java similarity index 78% rename from flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java rename to flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java index 52e740c27ef..7bcb7f9816b 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java @@ -26,7 +26,25 @@ import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class StormBoltExclamation { +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}. + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamationWithStormBolt <text path> <result path>
+ * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

    + *
  • use a Storm bolt within a Flink Streaming program
  • + *
+ */ +public class ExclamationWithStormBolt { // ************************************************************************* // PROGRAM @@ -90,13 +108,13 @@ public class StormBoltExclamation { textPath = args[0]; outputPath = args[1]; } else { - System.err.println("Usage: StormBoltExclamation "); + System.err.println("Usage: ExclamationWithStormBolt "); return false; } } else { - System.out.println("Executing StormBoltExclamation example with built-in default data"); + System.out.println("Executing ExclamationWithStormBolt example with built-in default data"); System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: StormBoltExclamation "); + System.out.println(" Usage: ExclamationWithStormBolt "); } return true; } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java similarity index 69% rename from flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java rename to flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java index 2569fa74bc0..f027eae165c 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java @@ -21,13 +21,32 @@ package org.apache.flink.stormcompatibility.excamation; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.util.StormFileSpout; -import org.apache.flink.stormcompatibility.util.StormInMemorySpout; -import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper; +import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout; +import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout; +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class StormSpoutExclamation { +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}. + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamationWithStormSpout <text path> <result path>
+ * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

    + *
  • use a Storm spout within a Flink Streaming program
  • + *
  • make use of the FiniteStormSpout interface
  • + *
+ */ +public class ExclamationWithStormSpout { // ************************************************************************* // PROGRAM @@ -89,13 +108,14 @@ public class StormSpoutExclamation { textPath = args[0]; outputPath = args[1]; } else { - System.err.println("Usage: StormSpoutExclamation "); + System.err.println("Usage: ExclamationWithStormSpout "); return false; } } else { - System.out.println("Executing StormSpoutExclamation example with built-in default data"); + System.out.println("Executing ExclamationWithStormSpout example with built-in default " + + "data"); System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: StormSpoutExclamation "); + System.out.println(" Usage: ExclamationWithStormSpout "); } return true; } @@ -106,12 +126,14 @@ public class StormSpoutExclamation { final String[] tokens = textPath.split(":"); final String localFile = tokens[tokens.length - 1]; return env.addSource( - new StormFiniteSpoutWrapper(new StormFileSpout(localFile), true), + new FiniteStormSpoutWrapper(new FiniteStormFileSpout(localFile), true), TypeExtractor.getForClass(String.class)).setParallelism(1); } - return env.addSource(new StormFiniteSpoutWrapper(new StormInMemorySpout(WordCountData.WORDS), true), - TypeExtractor.getForClass(String.class)); + return env.addSource( + new FiniteStormSpoutWrapper( + new FiniteStormInMemorySpout(WordCountData.WORDS), true), + TypeExtractor.getForClass(String.class)).setParallelism(1); } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java index a25e5e0a1f7..5941ff0c839 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java @@ -21,6 +21,27 @@ import backtype.storm.utils.Utils; import org.apache.flink.stormcompatibility.api.FlinkLocalCluster; import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and + * submitted to Flink for execution in the same way as to a Storm {@link LocalCluster}. + *

+ * This example shows how to run program directly within Java, thus it cannot be used to submit a + * {@link StormTopology} via Flink command line clients (ie, bin/flink). + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamationLocal <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

    + *
  • run a regular Storm program locally on Flink
  • + *
+ */ public class StormExclamationLocal { public final static String topologyId = "Streaming Exclamation"; @@ -43,10 +64,6 @@ public class StormExclamationLocal { cluster.submitTopology(topologyId, null, builder.createTopology()); Utils.sleep(10 * 1000); - - // TODO kill does no do anything so far - cluster.killTopology(topologyId); - cluster.shutdown(); } } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java index 3f55316f7e7..0f64301087d 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java @@ -25,6 +25,28 @@ import backtype.storm.utils.Utils; import org.apache.flink.stormcompatibility.api.FlinkClient; import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and + * submitted to Flink for execution in the same way as to a Storm cluster similar to + * {@link NimbusClient}. The Flink cluster can be local or remote. + *

+ * This example shows how to submit the program via Java, thus it cannot be used to submit a + * {@link StormTopology} via Flink command line clients (ie, bin/flink). + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamationRemoteByClient <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

    + *
  • submit a regular Storm program to a local or remote Flink cluster.
  • + *
+ */ public class StormExclamationRemoteByClient { public final static String topologyId = "Streaming Exclamation"; diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java index 728c5c79ac6..d5805200359 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java @@ -22,6 +22,27 @@ import org.apache.flink.stormcompatibility.api.FlinkClient; import org.apache.flink.stormcompatibility.api.FlinkSubmitter; import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; +/** + * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text + * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and + * submitted to Flink for execution in the same way as to a Storm cluster similar to + * {@link StormSubmitter}. The Flink cluster can be local or remote. + *

+ * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). + *

+ *

+ * The input is a plain text file with lines separated by newline characters. + *

+ *

+ * Usage: StormExclamationRemoteByClient <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + *

+ *

+ * This example shows how to: + *

    + *
  • submit a regular Storm program to a local or remote Flink cluster.
  • + *
+ */ public class StormExclamationRemoteBySubmitter { public final static String topologyId = "Streaming Exclamation"; diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java new file mode 100644 index 00000000000..d45ad7603ba --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Values; +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.Map; + +/** + * Implements a Storm Spout that reads data from a given local file. The spout stops automatically + * when it reached the end of the file. + */ +public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout { + private static final long serialVersionUID = -6996907090003590436L; + + private final String path; + private BufferedReader reader; + private String line; + private boolean newLineRead; + + public FiniteStormFileSpout(final String path) { + this.path = path; + } + + @SuppressWarnings("rawtypes") + @Override + public void open(final Map conf, final TopologyContext context, + final SpoutOutputCollector collector) { + super.open(conf, context, collector); + try { + this.reader = new BufferedReader(new FileReader(this.path)); + } catch (final FileNotFoundException e) { + throw new RuntimeException(e); + } + newLineRead = false; + } + + @Override + public void close() { + if (this.reader != null) { + try { + this.reader.close(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void nextTuple() { + this.collector.emit(new Values(line)); + newLineRead = false; + } + + /** + * Can be called before nextTuple() any times including 0. + */ + public boolean reachedEnd() { + try { + readLine(); + } catch (IOException e) { + throw new RuntimeException("Exception occured while reading file " + path); + } + return line == null; + } + + private void readLine() throws IOException { + if (!newLineRead) { + line = reader.readLine(); + newLineRead = true; + } + } + +} diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java similarity index 55% rename from flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java rename to flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java index 7faf6cd3e89..899c569db2a 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java @@ -18,15 +18,31 @@ package org.apache.flink.stormcompatibility.util; -import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout; -public class RawOutputFormatter implements OutputFormatter { - private static final long serialVersionUID = 8685668993521259832L; +/** + * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops + * automatically when it emitted all of the data. + */ +public class FiniteStormInMemorySpout extends AbstractStormSpout implements FiniteStormSpout { + + private static final long serialVersionUID = -4008858647468647019L; + + private String[] source; + private int counter = 0; + + public FiniteStormInMemorySpout(String[] source) { + this.source = source; + } @Override - public String format(final Tuple input) { - assert (input.size() == 1); - return input.getValue(0).toString(); + public void nextTuple() { + this.collector.emit(new Values(source[this.counter++])); + } + + public boolean reachedEnd() { + return counter >= source.length; } } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java index bfc3135db65..ec9adfe841e 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java @@ -18,12 +18,19 @@ package org.apache.flink.stormcompatibility.util; -import java.io.Serializable; - import backtype.storm.tuple.Tuple; +import java.io.Serializable; + public interface OutputFormatter extends Serializable { + /** + * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output + * tuples before writing them out to a file or to the consol. + * + * @param input The tuple to be formatted + * @return The string result of the formatting + */ public String format(Tuple input); } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java index ccb617baa9f..0702e947ea0 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java @@ -23,9 +23,20 @@ import backtype.storm.tuple.Tuple; public class SimpleOutputFormatter implements OutputFormatter { private static final long serialVersionUID = 6349573860144270338L; + /** + * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that + * field. This method is used for formatting raw outputs wrapped in tuples, before writing them + * out to a file or to the consol. + * + * @param input + * The tuple to be formatted + * @return The string result of the formatting + */ @Override public String format(final Tuple input) { - return input.getValues().toString(); + if (input.getValues().size() != 1) { + throw new RuntimeException("The output is not raw"); + } + return input.getValue(0).toString(); } - } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java index 8f4503f8de2..eab58f58c8e 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java @@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; *

* This example shows how to: *

    - *
  • use a Storm bolt within a Flink Streaming program. + *
  • use a Storm bolt within a Flink Streaming program.
  • *
*/ public class BoltTokenizerWordCount { diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java index 361d83af256..4c012d80e68 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java @@ -43,7 +43,7 @@ import org.apache.flink.util.Collector; *

* This example shows how to: *

    - *
  • use a Storm bolt within a Flink Streaming program. + *
  • use a Storm spout within a Flink Streaming program.
  • *
*/ public class SpoutSourceWordCount { @@ -145,7 +145,7 @@ public class SpoutSourceWordCount { } return env.addSource(new StormFiniteSpoutWrapper(new StormInMemorySpout(WordCountData.WORDS), true), - TypeExtractor.getForClass(String.class)); + TypeExtractor.getForClass(String.class)).setParallelism(1); } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java index 3fbd5b73043..836c8e945c1 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java @@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; *

* This example shows how to: *

    - *
  • run a regular Storm program locally on Flink + *
  • run a regular Storm program locally on Flink
  • *
*/ public class StormWordCountLocal { diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java index 9e56c143578..0bbe11b28e0 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java @@ -46,7 +46,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; *

* This example shows how to: *

    - *
  • submit a regular Storm program to a local or remote Flink cluster. + *
  • submit a regular Storm program to a local or remote Flink cluster.
  • *
*/ public class StormWordCountRemoteByClient { diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java index a1fb79d00e6..264dc415b0a 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java @@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; *

* This example shows how to: *

    - *
  • submit a regular Storm program to a local or remote Flink cluster. + *
  • submit a regular Storm program to a local or remote Flink cluster.
  • *
*/ public class StormWordCountRemoteBySubmitter { diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java index f028266585c..367ca9e660e 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java @@ -47,7 +47,7 @@ import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTok *

* This example shows how to: *

    - *
  • how to construct a regular Storm topology as Flink program + *
  • how to construct a regular Storm topology as Flink program
  • *
*/ public class WordCountTopology { diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java similarity index 86% rename from flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java rename to flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java index 2b08b4b8bfb..930f87bced7 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java @@ -18,12 +18,12 @@ package org.apache.flink.stormcompatibility.exclamation; -import org.apache.flink.stormcompatibility.excamation.StormSpoutExclamation; +import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt; import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class StormSpoutExclamationITCase extends StreamingProgramTestBase { +public class ExclamationWithStormBoltITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; @@ -41,7 +41,7 @@ public class StormSpoutExclamationITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - StormSpoutExclamation.main(new String[]{this.textPath, this.resultPath}); + ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath}); } } diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java similarity index 86% rename from flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java rename to flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java index 75dd5fc6a78..4c515ce147d 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java @@ -18,12 +18,12 @@ package org.apache.flink.stormcompatibility.exclamation; -import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation; +import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout; import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData; import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; -public class StormBoltExclamationITCase extends StreamingProgramTestBase { +public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase { protected String textPath; protected String resultPath; @@ -41,7 +41,7 @@ public class StormBoltExclamationITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - StormBoltExclamation.main(new String[]{this.textPath, this.resultPath}); + ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath}); } } -- GitLab