提交 88636799 编写于 作者: M Maximilian Michels

[FLINK-2837][storm] various improvements for the compatibility layer

- refactor to use Storm's topology builder
- remove FlinkTopologyBuilder
- instantiate context-based StreamExecutionEnvironment (local or remote)
- remove some of the Flink and Storm behavior replicating classes
- modify FlinkTopology to parse Storm topology directly
- replace StormTestBase with StreamingTestBase
- add print example
- FlinkTopologyBuilder changes (check if all inputs are available before processing)
- correct package typo
- two input support
- add join example
- update docs

This closes #1398.
上级 20fe2af8
......@@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t
Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
- `StormSubmitter` replaced by `FlinkSubmitter`
- `NimbusClient` and `Client` replaced by `FlinkClient`
- `LocalCluster` replaced by `FlinkLocalCluster`
In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.
If a parameter is not specified, the value is taken from `flink-conf.yaml`.
The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology();
TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
......@@ -81,12 +79,12 @@ builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("count
Config conf = new Config();
if(runLocal) { // submit to test cluster
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", conf, builder.createTopology());
cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
// optional
// conf.put(Config.NIMBUS_HOST, "remoteHost");
// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
......@@ -61,6 +61,14 @@ under the License.
......@@ -226,7 +234,7 @@ under the License.
<!-- WordCount Storm topology-->
<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
<!-- Example for whole topologies (ie, if FlinkTopology is used) -->
<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
Thus, 'defaults.yaml' is not available for maven-jar-plugin.
......@@ -15,14 +15,14 @@
* limitations under the License.
package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;
import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
......@@ -58,15 +58,17 @@ public class ExclamationLocal {
// build Topology the Storm way
final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
final TopologyBuilder builder = ExclamationTopology.buildTopology();
// execute program locally
Config conf = new Config();
conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
......@@ -15,17 +15,17 @@
* limitations under the License.
package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;
import backtype.storm.topology.TopologyBuilder;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.FiniteFileSpout;
import org.apache.flink.storm.util.FiniteInMemorySpout;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.SimpleOutputFormatter;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
* Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
......@@ -51,8 +51,8 @@ public class ExclamationTopology {
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new SimpleOutputFormatter();
public static FlinkTopologyBuilder buildTopology() {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
// get input data
if (fileInputOutput) {
......@@ -16,19 +16,18 @@
* limitations under the License.
package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import backtype.storm.utils.Utils;
* Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
......@@ -16,8 +16,9 @@
* limitations under the License.
package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
......@@ -28,8 +29,6 @@ import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import backtype.storm.utils.Utils;
* Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
......@@ -16,7 +16,7 @@
* limitations under the License.
package org.apache.flink.storm.excamation.operators;
package org.apache.flink.storm.exclamation.operators;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.join;
import backtype.storm.Config;
import backtype.storm.testing.FeederSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.TupleOutputFormatter;
import storm.starter.bolt.PrinterBolt;
import storm.starter.bolt.SingleJoinBolt;
public class SingleJoinExample {
public static void main(String[] args) throws Exception {
final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender", "hobbies"));
final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("gender", genderSpout);
builder.setSpout("age", ageSpout);
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
.fieldsGrouping("gender", new Fields("id"))
.fieldsGrouping("age", new Fields("id"));
// emit result
if (args.length > 0) {
// read the text file from given input path
builder.setBolt("fileOutput", new BoltFileSink(args[0], new TupleOutputFormatter()))
} else {
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("join");
Config conf = new Config();
String[] hobbies = new String[] {"reading", "biking", "travelling", "watching tv"};
for (int i = 0; i < 10; i++) {
String gender;
if (i % 2 == 0) {
gender = "male";
else {
gender = "female";
genderSpout.feed(new Values(i, gender, hobbies[i % hobbies.length]));
for (int i = 9; i >= 0; i--) {
ageSpout.feed(new Values(i, i + 20));
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.print;
import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import storm.starter.bolt.PrinterBolt;
import storm.starter.spout.TwitterSampleSpout;
import java.util.Arrays;
* Prints incoming tweets. Tweets can be filtered by keywords.
public class PrintSampleStream {
public static void main(String[] args) throws Exception {
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];
// keywords start with the 5th parameter
String[] keyWords = Arrays.copyOfRange(args, 4, args.length);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret,
accessToken, accessTokenSecret, keyWords));
builder.setBolt("print", new PrinterBolt())
Config conf = new Config();
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("Print", conf, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
......@@ -23,8 +23,6 @@ import backtype.storm.tuple.Values;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.storm.util.FiniteSpout;
* Implements a Spout that reads data from a given local file. The spout stops automatically
* when it reached the end of the file.
......@@ -18,9 +18,6 @@
package org.apache.flink.storm.util;
import org.apache.flink.storm.util.FiniteSpout;
* Implements a Spout that reads String[] data stored in memory. The Spout stops automatically when it emitted all of
* the data.
......@@ -18,7 +18,6 @@
package org.apache.flink.storm.wordcount;
import backtype.storm.topology.IRichBolt;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
......@@ -18,7 +18,6 @@
package org.apache.flink.storm.wordcount;
import backtype.storm.topology.IRichBolt;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -19,7 +19,6 @@ package org.apache.flink.storm.wordcount;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
......@@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
......@@ -35,7 +35,7 @@ import org.apache.flink.storm.api.FlinkTopologyBuilder;
* <p>
* The input is a plain text file with lines separated by newline characters.
* <p>
* Usage: <code>WordCountLocal &lt;text path&gt; &lt;result path&gt;</code><br>
* Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
* <p>
* This example shows how to:
......@@ -57,16 +57,13 @@ public class WordCountLocal {
// build Topology the Storm way
final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
final TopologyBuilder builder = WordCountTopology.buildTopology();
// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
......@@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
......@@ -58,17 +58,14 @@ public class WordCountLocalByName {
// build Topology the Storm way
final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false);
final TopologyBuilder builder = WordCountTopology.buildTopology(false);
// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
......@@ -22,12 +22,13 @@ import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
......@@ -63,7 +64,7 @@ public class WordCountRemoteByClient {
// build Topology the Storm way
final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
final TopologyBuilder builder = WordCountTopology.buildTopology();
// execute program on Flink cluster
final Config conf = new Config();
......@@ -73,7 +74,7 @@ public class WordCountRemoteByClient {
conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
cluster.submitTopology(topologyId, uploadedJarLocation, FlinkTopology.createTopology(builder));
Utils.sleep(5 * 1000);
......@@ -21,10 +21,11 @@ import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkSubmitter;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
......@@ -57,7 +58,7 @@ public class WordCountRemoteBySubmitter {
// build Topology the Storm way
final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
final TopologyBuilder builder = WordCountTopology.buildTopology();
// execute program on Flink cluster
final Config conf = new Config();
......@@ -71,7 +72,7 @@ public class WordCountRemoteBySubmitter {
// The user jar file must be specified via JVM argument if executed via Java.
// => -Dstorm.jar=target/WordCount-StormTopology.jar
// If bin/flink is used, the jar file is detected automatically.
FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
FlinkSubmitter.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
Thread.sleep(5 * 1000);
......@@ -18,13 +18,12 @@
package org.apache.flink.storm.wordcount;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
import org.apache.flink.storm.wordcount.operators.BoltCounter;
import org.apache.flink.storm.wordcount.operators.BoltCounterByName;
......@@ -55,13 +54,13 @@ public class WordCountTopology {
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new TupleOutputFormatter();
public static FlinkTopologyBuilder buildTopology() {
public static TopologyBuilder buildTopology() {
return buildTopology(true);
public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
public static TopologyBuilder buildTopology(boolean indexOrName) {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
final TopologyBuilder builder = new TopologyBuilder();
// get input data
if (fileInputOutput) {
......@@ -17,16 +17,15 @@
package org.apache.flink.storm.wordcount.operators;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.util.InMemorySpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.util.FiniteInMemorySpout;
* Implements a Spout that reads data from {@link WordCountData#WORDS}.
public final class WordCountInMemorySpout extends InMemorySpout<String> {
public final class WordCountInMemorySpout extends FiniteInMemorySpout {
private static final long serialVersionUID = 8832143302409465843L;
public WordCountInMemorySpout() {
......@@ -18,12 +18,11 @@
package org.apache.flink.storm.exclamation;
import org.apache.flink.storm.excamation.ExclamationWithBolt;
import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class ExclamationWithBoltITCase extends StormTestBase {
public class ExclamationWithBoltITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,12 +18,11 @@
package org.apache.flink.storm.exclamation;
import org.apache.flink.storm.excamation.ExclamationWithSpout;
import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class ExclamationWithSpoutITCase extends StormTestBase {
public class ExclamationWithSpoutITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,12 +18,11 @@
package org.apache.flink.storm.exclamation;
import org.apache.flink.storm.excamation.ExclamationLocal;
import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class StormExclamationLocalITCase extends StormTestBase {
public class StormExclamationLocalITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -15,15 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.api;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
package org.apache.flink.storm.join;
import backtype.storm.generated.StormTopology;
import com.google.common.base.Joiner;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class SingleJoinITCase extends StreamingProgramTestBase {
protected static String expectedOutput[] = {
protected String resultPath;
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
public class TestTopologyBuilder extends FlinkTopologyBuilder {
public StormTopology getStormTopology() {
return super.getStormTopology();
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), this.resultPath);
protected void testProgram() throws Exception {
// We need to remove the file scheme because we can't use the Flink file system.
// (to remain compatible with Storm)
SingleJoinExample.main(new String[]{ this.resultPath.replace("file:", "") });
......@@ -17,12 +17,12 @@
package org.apache.flink.storm.split;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import backtype.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.RandomSpout;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
public class SplitBoltTopology {
......@@ -33,8 +33,8 @@ public class SplitBoltTopology {
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new TupleOutputFormatter();
public static FlinkTopologyBuilder buildTopology() {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spoutId, new RandomSpout(false, seed));
builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId);
......@@ -17,12 +17,12 @@
package org.apache.flink.storm.split;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import backtype.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.RandomSpout;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
public class SplitSpoutTopology {
......@@ -32,8 +32,8 @@ public class SplitSpoutTopology {
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new TupleOutputFormatter();
public static FlinkTopologyBuilder buildTopology() {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spoutId, new RandomSpout(true, seed));
builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
......@@ -16,10 +16,11 @@
package org.apache.flink.storm.split;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitStreamBoltLocal {
public final static String topologyId = "Bolt split stream example";
......@@ -35,16 +36,13 @@ public class SplitStreamBoltLocal {
// build Topology the Storm way
final FlinkTopologyBuilder builder = SplitBoltTopology.buildTopology();
final TopologyBuilder builder = SplitBoltTopology.buildTopology();
// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(5 * 1000);
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
......@@ -16,10 +16,11 @@
package org.apache.flink.storm.split;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitStreamSpoutLocal {
public final static String topologyId = "Spout split stream example";
......@@ -35,16 +36,13 @@ public class SplitStreamSpoutLocal {
// build Topology the Storm way
final FlinkTopologyBuilder builder = SplitSpoutTopology.buildTopology();
final TopologyBuilder builder = SplitSpoutTopology.buildTopology();
// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(5 * 1000);
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
......@@ -17,17 +17,18 @@
package org.apache.flink.storm.tests;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.TaskIdBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class StormFieldsGroupingITCase extends StormTestBase {
public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
private final static String topologyId = "FieldsGrouping Test";
private final static String spoutId = "spout";
......@@ -52,7 +53,7 @@ public class StormFieldsGroupingITCase extends StormTestBase {
final String[] tokens = this.resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
......@@ -60,7 +61,7 @@ public class StormFieldsGroupingITCase extends StormTestBase {
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
......@@ -27,6 +27,9 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
* Bolt to prepend all incoming tuple values with the task id.
public class TaskIdBolt extends BaseRichBolt {
private static final long serialVersionUID = -7966475984592762720L;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.util;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Test;
import static org.junit.Assert.fail;
* Base class for Storm tests.
public abstract class StormTestBase extends AbstractTestBase {
public static final int DEFAULT_PARALLELISM = 4;
public StormTestBase() {
this(new Configuration());
public StormTestBase(Configuration config) {
super(config, StreamingMode.STREAMING);
// ------------------------------------------------------------------------
// Methods to create the test program and for pre- and post- test work
// ------------------------------------------------------------------------
protected abstract void testProgram() throws Exception;
protected void preSubmit() throws Exception {}
protected void postSubmit() throws Exception {}
// ------------------------------------------------------------------------
// Test entry point
// ------------------------------------------------------------------------
public void testJob() throws Exception {
try {
// pre-submit
try {
catch (Exception e) {
fail("Pre-submit work caused an error: " + e.getMessage());
// prepare the test environment
// we need to initialize the stream test environment, and the storm local cluster
TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM);
FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() {
public FlinkLocalCluster createLocalCluster() {
return new FlinkLocalCluster(executor);
// call the test program
try {
catch (Exception e) {
fail("Error while calling the test program: " + e.getMessage());
// post-submit
try {
catch (Exception e) {
fail("Post-submit work caused an error: " + e.getMessage());
finally {
// reset the FlinkLocalCluster to its default behavior
FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory());
// reset the StreamExecutionEnvironment to its default behavior
// clean up all resources
......@@ -18,11 +18,10 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class BoltTokenizerWordCountITCase extends StormTestBase {
public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,11 +18,10 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojo;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,11 +18,10 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNames;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,11 +18,10 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class SpoutSourceWordCountITCase extends StormTestBase {
public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,11 +18,10 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.WordCountLocal;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class WordCountLocalITCase extends StormTestBase {
public class WordCountLocalITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -18,11 +18,11 @@
package org.apache.flink.storm.wordcount;
import org.apache.flink.storm.util.StormTestBase;
import org.apache.flink.storm.wordcount.WordCountLocalByName;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
public class WordCountLocalNamedITCase extends StormTestBase {
public class WordCountLocalNamedITCase extends StreamingProgramTestBase {
protected String textPath;
protected String resultPath;
......@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
......@@ -183,10 +183,10 @@ public class FlinkClient {
/* set storm configuration */
if (this.conf != null) {
topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(this.conf));
final StreamGraph streamGraph = topology.getStreamGraph();
final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
final JobGraph jobGraph = streamGraph.getJobGraph();
......@@ -26,6 +26,7 @@ import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -48,12 +49,10 @@ public class FlinkLocalCluster {
private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
/** The flink mini cluster on which to execute the programs */
private final FlinkMiniCluster flink;
private FlinkMiniCluster flink;
public FlinkLocalCluster() {
this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
public FlinkLocalCluster(FlinkMiniCluster flink) {
......@@ -71,13 +70,26 @@ public class FlinkLocalCluster {
LOG.info("Running Storm topology on FlinkLocalCluster");
if(conf != null) {
topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(conf));
StreamGraph streamGraph = topology.getStreamGraph();
StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
JobGraph jobGraph = streamGraph.getJobGraph();
if (flink == null) {
Configuration configuration = new Configuration();
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
flink = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
......@@ -99,6 +111,7 @@ public class FlinkLocalCluster {
public void shutdown() {
flink = null;
public String getTopologyConf(final String id) {
......@@ -20,7 +20,6 @@ package org.apache.flink.storm.api;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
......@@ -15,75 +16,473 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.api;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
* {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
* StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
* cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
* {@link FlinkClient}.
* {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program.
* <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
public class FlinkTopology extends StreamExecutionEnvironment {
public class FlinkTopology {
/** All declared streams and output schemas by operator ID */
private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
/** All spouts&bolts declarers by their ID */
private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<>();
/** The number of declared tasks for the whole program (ie, sum over all dops) */
private int numberOfTasks = 0;
private final TopologyBuilder builder;
public FlinkTopology() {
// Set default parallelism to 1, to mirror Storm default behavior
// needs to be a class member for internal testing purpose
private final StormTopology stormTopology;
private final Map<String, IRichSpout> spouts;
private final Map<String, IRichBolt> bolts;
private final StreamExecutionEnvironment env;
private FlinkTopology(TopologyBuilder builder) {
this.builder = builder;
this.stormTopology = builder.createTopology();
// extract the spouts and bolts
this.spouts = getPrivateField("_spouts");
this.bolts = getPrivateField("_bolts");
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kick off the translation immediately
* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
* FlinkClient}.
* @throws UnsupportedOperationException
* at every invocation
* Creates a Flink program that uses the specified spouts and bolts.
* @param stormBuilder The Storm topology builder to use for creating the Flink topology.
* @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
public JobExecutionResult execute() throws Exception {
throw new UnsupportedOperationException(
"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
return new FlinkTopology(stormBuilder);
* Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
* FlinkClient}.
* @throws UnsupportedOperationException
* at every invocation
* Returns the underlying Flink {@link StreamExecutionEnvironment} for the Storm topology.
* @return The contextual environment (local or remote).
public JobExecutionResult execute(final String jobName) throws Exception {
throw new UnsupportedOperationException(
"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
public StreamExecutionEnvironment getExecutionEnvironment() {
return this.env;
* Increased the number of declared tasks of this program by the given value.
* @param dop
* The dop of a new operator that increases the number of overall tasks.
* Directly executes the Storm topology based on the current context (local when in IDE and
* remote when executed through ./bin/flink).
* @return The Flink {@link JobExecutionResult} after the execution of the Storm topology.
* @throws Exception which occurs during execution of the translated Storm topology.
public void increaseNumberOfTasks(final int dop) {
assert (dop > 0);
this.numberOfTasks += dop;
public JobExecutionResult execute() throws Exception {
return env.execute();
private <T> Map<String, T> getPrivateField(String field) {
try {
Field f = builder.getClass().getDeclaredField(field);
return copyObject((Map<String, T>) f.get(builder));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Couldn't get " + field + " from TopologyBuilder", e);
private <T> T copyObject(T object) {
try {
return InstantiationUtil.deserializeObject(
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Failed to copy object.");
* Return the number or required tasks to execute this program.
* @return the number or required tasks to execute this program
* Creates a Flink program that uses the specified spouts and bolts.
public int getNumberOfTasks() {
return this.numberOfTasks;
private void translateTopology() {
// Storm defaults to parallelism 1
/* Translation of topology */
for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
final String spoutId = spout.getKey();
final IRichSpout userSpout = spout.getValue();
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
this.outputStreams.put(spoutId, sourceStreams);
declarers.put(spoutId, declarer);
final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
final DataStreamSource<?> source;
if (sourceStreams.size() == 1) {
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
outputStreams.put(outputStreamId, src);
source = src;
} else {
final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
userSpout, spoutId, null, null);
@SuppressWarnings({ "unchecked", "rawtypes" })
DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
spoutWrapperMultipleOutputs, spoutId,
(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
.split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId)
.map(new SplitStreamMapper<Tuple>());
outputStreams.put(streamId, outStream);
source = multiSource;
availableInputs.put(spoutId, outputStreams);
final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
if (common.is_set_parallelism_hint()) {
int dop = common.get_parallelism_hint();
} else {
* 1. Connect all spout streams with bolts streams
* 2. Then proceed with the bolts stream already connected
* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
* its producer
* ->thus, we might need to repeat multiple times
boolean makeProgress = true;
while (bolts.size() > 0) {
if (!makeProgress) {
throw new RuntimeException(
"Unable to build Topology. Could not connect the following bolts: "
+ bolts.keySet());
makeProgress = false;
final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
while (boltsIterator.hasNext()) {
final Entry<String, IRichBolt> bolt = boltsIterator.next();
final String boltId = bolt.getKey();
final IRichBolt userBolt = copyObject(bolt.getValue());
final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
if (unprocessedBoltInputs == null) {
unprocessedBoltInputs = new HashSet<>();
unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
// check if all inputs are available
final int numberOfInputs = unprocessedBoltInputs.size();
int inputsAvailable = 0;
for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
final String producerId = entry.getKey().get_componentId();
final String streamId = entry.getKey().get_streamId();
final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
if (streams != null && streams.get(streamId) != null) {
if (inputsAvailable != numberOfInputs) {
// traverse other bolts first until inputs are available
} else {
makeProgress = true;
final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);
for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
final GlobalStreamId streamId = input.getKey();
final Grouping grouping = input.getValue();
final String producerId = streamId.get_componentId();
final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
final Iterator<Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
final Entry<GlobalStreamId, DataStream<Tuple>> firstInput = iterator.next();
GlobalStreamId streamId = firstInput.getKey();
DataStream<Tuple> inputStream = firstInput.getValue();
final SingleOutputStreamOperator<?, ?> outputStream;
switch (numberOfInputs) {
case 1:
outputStream = createOutput(boltId, userBolt, streamId, inputStream);
case 2:
Entry<GlobalStreamId, DataStream<Tuple>> secondInput = iterator.next();
GlobalStreamId streamId2 = secondInput.getKey();
DataStream<Tuple> inputStream2 = secondInput.getValue();
outputStream = createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2);
throw new UnsupportedOperationException("Don't know how to translate a bolt "
+ boltId + " with " + numberOfInputs + " inputs.");
if (common.is_set_parallelism_hint()) {
int dop = common.get_parallelism_hint();
} else {
private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt,
GlobalStreamId streamId, Grouping grouping,
Map<String, DataStream<Tuple>> producer) {
assert (userBolt != null);
assert(boltId != null);
assert(streamId != null);
assert(grouping != null);
assert(producer != null);
final String producerId = streamId.get_componentId();
final String inputStreamId = streamId.get_streamId();
DataStream<Tuple> inputStream = producer.get(inputStreamId);
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
declarers.put(boltId, declarer);
this.outputStreams.put(boltId, declarer.outputStreams);
// if producer was processed already
if (grouping.is_set_shuffle()) {
// Storm uses a round-robin shuffle strategy
inputStream = inputStream.rebalance();
} else if (grouping.is_set_fields()) {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
inputStream = inputStream.keyBy(prodDeclarer
} else {
inputStream = inputStream.global();
} else if (grouping.is_set_all()) {
inputStream = inputStream.broadcast();
} else if (!grouping.is_set_local_or_shuffle()) {
throw new UnsupportedOperationException(
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
return inputStream;
private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt, GlobalStreamId streamId, DataStream<Tuple> inputStream) {
return createOutput(boltId, bolt, streamId, inputStream, null, null);
private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt,
GlobalStreamId streamId, DataStream<Tuple> inputStream,
GlobalStreamId streamId2, DataStream<Tuple> inputStream2) {
assert(boltId != null);
assert(streamId != null);
assert(inputStream != null);
Preconditions.checkArgument((streamId2 == null) == (inputStream2 == null));
String producerId = streamId.get_componentId();
String inputStreamId = streamId.get_streamId();
final HashMap<String, Fields> boltOutputs = this.outputStreams.get(boltId);
final FlinkOutputFieldsDeclarer declarer = declarers.get(boltId);
final SingleOutputStreamOperator<?, ?> outputStream;
if (boltOutputs.size() < 2) { // single output stream or sink
String outputStreamId;
if (boltOutputs.size() == 1) {
outputStreamId = (String) boltOutputs.keySet().toArray()[0];
} else {
outputStreamId = null;
final TypeInformation<Tuple> outType = declarer
final SingleOutputStreamOperator<Tuple, ?> outStream;
// only one input
if (streamId2 == null) {
BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>(
bolt, boltId, producerId, inputStreamId,
this.outputStreams.get(producerId).get(inputStreamId), null);
outStream = inputStream.transform(boltId, outType, boltWrapper);
} else {
String producerId2 = streamId2.get_componentId();
String inputStreamId2 = streamId2.get_streamId();
final BoltWrapperTwoInput<Tuple, Tuple, Tuple> boltWrapper = new BoltWrapperTwoInput<>(
bolt, boltId,
inputStreamId, inputStreamId2, producerId, producerId2,
outStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
if (outType != null) {
// only for non-sink nodes
final HashMap<String, DataStream<Tuple>> op = new HashMap<>();
op.put(outputStreamId, outStream);
availableInputs.put(boltId, op);
outputStream = outStream;
} else {
@SuppressWarnings({ "unchecked", "rawtypes" })
final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream;
// only one input
if (streamId2 == null) {
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(
bolt, boltId, inputStreamId, producerId, this.outputStreams.get(producerId).get(inputStreamId),
multiStream = inputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
} else {
String producerId2 = streamId2.get_componentId();
String inputStreamId2 = streamId2.get_streamId();
final BoltWrapperTwoInput<Tuple, Tuple, SplitStreamType<Tuple>> boltWrapper = new BoltWrapperTwoInput<>(
bolt, boltId,
inputStreamId, inputStreamId2, producerId, producerId2,
multiStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
.split(new StormStreamSelector<Tuple>());
final HashMap<String, DataStream<Tuple>> op = new HashMap<>();
for (String outputStreamId : boltOutputs.keySet()) {
new SplitStreamMapper<Tuple>()));
SingleOutputStreamOperator<Tuple, ?> outStream = splitStream
.select(outputStreamId).map(new SplitStreamMapper<Tuple>());
op.put(outputStreamId, outStream);
availableInputs.put(boltId, op);
outputStream = multiStream;
return outputStream;
// for internal testing purpose only
public StormTopology getStormTopology() {
return this.stormTopology;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.api;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
* {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
* topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
* implementation to ensure equal behavior.<br>
* <br>
* <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
public class FlinkTopologyBuilder {
/** A Storm {@link TopologyBuilder} to build a real Storm topology */
private final TopologyBuilder stormBuilder = new TopologyBuilder();
/** All user spouts by their ID */
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
/** All user bolts by their ID */
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
/** All declared streams and output schemas by operator ID */
private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
/** All spouts&bolts declarers by their ID */
private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
// needs to be a class member for internal testing purpose
private StormTopology stormTopology;
* Creates a Flink program that uses the specified spouts and bolts.
public FlinkTopology createTopology() {
this.stormTopology = this.stormBuilder.createTopology();
final FlinkTopology env = new FlinkTopology();
final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>();
for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
final String spoutId = spout.getKey();
final IRichSpout userSpout = spout.getValue();
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
this.outputStreams.put(spoutId, sourceStreams);
declarers.put(spoutId, declarer);
final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
final DataStreamSource<?> source;
if (sourceStreams.size() == 1) {
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
outputStreams.put(outputStreamId, src);
source = src;
} else {
final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
userSpout, spoutId, null, null);
@SuppressWarnings({ "unchecked", "rawtypes" })
DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
spoutWrapperMultipleOutputs, spoutId,
(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
.split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId)
.map(new SplitStreamMapper<Tuple>());
outputStreams.put(streamId, outStream);
source = multiSource;
availableInputs.put(spoutId, outputStreams);
int dop = 1;
final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
} else {
final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
/* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
* its producer
* ->thus, we might need to repeat multiple times
boolean makeProgress = true;
while (unprocessedBolts.size() > 0) {
if (!makeProgress) {
throw new RuntimeException(
"Unable to build Topology. Could not connect the following bolts: "
+ unprocessedBolts.keySet());
makeProgress = false;
final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
while (boltsIterator.hasNext()) {
final Entry<String, IRichBolt> bolt = boltsIterator.next();
final String boltId = bolt.getKey();
final IRichBolt userBolt = bolt.getValue();
final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
if (unprocessedInputs == null) {
unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
// connect each available producer to the current bolt
final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
while (inputStreamsIterator.hasNext()) {
final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
final String producerId = stormInputStream.getKey().get_componentId();
final String inputStreamId = stormInputStream.getKey().get_streamId();
final HashMap<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
if (producer != null) {
makeProgress = true;
DataStream<Tuple> inputStream = producer.get(inputStreamId);
if (inputStream != null) {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
this.outputStreams.put(boltId, boltOutputStreams);
this.declarers.put(boltId, declarer);
// if producer was processed already
final Grouping grouping = stormInputStream.getValue();
if (grouping.is_set_shuffle()) {
// Storm uses a round-robin shuffle strategy
inputStream = inputStream.rebalance();
} else if (grouping.is_set_fields()) {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
inputStream = inputStream.keyBy(prodDeclarer
} else {
inputStream = inputStream.global();
} else if (grouping.is_set_all()) {
inputStream = inputStream.broadcast();
} else if (!grouping.is_set_local_or_shuffle()) {
throw new UnsupportedOperationException(
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
final SingleOutputStreamOperator<?, ?> outputStream;
if (boltOutputStreams.size() < 2) { // single output stream or sink
String outputStreamId = null;
if (boltOutputStreams.size() == 1) {
outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
final TypeInformation<Tuple> outType = declarer
final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>(
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
.transform(boltId, outType, boltWrapperSingleOutput);
if (outType != null) {
// only for non-sink nodes
final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
op.put(outputStreamId, outStream);
availableInputs.put(boltId, op);
outputStream = outStream;
} else {
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>(
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
@SuppressWarnings({ "unchecked", "rawtypes" })
final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream
.transform(boltId, outType, boltWrapperMultipleOutputs);
final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
.split(new StormStreamSelector<Tuple>());
final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
for (String outputStreamId : boltOutputStreams.keySet()) {
SingleOutputStreamOperator<Tuple, ?> outStream = splitStream
new SplitStreamMapper<Tuple>());
op.put(outputStreamId, outStream);
availableInputs.put(boltId, op);
outputStream = multiStream;
int dop = 1;
if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
} else {
} else {
throw new RuntimeException("Cannot connect '" + boltId + "' to '"
+ producerId + "'. Stream '" + inputStreamId + "' not found.");
if (unprocessedInputs.size() == 0) {
// all inputs are connected; processing bolt completed
return env;
* Define a new bolt in this topology with parallelism of just one thread.
* @param id
* the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt
* the bolt
* @return use the returned object to declare the inputs to this component
public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
return this.setBolt(id, bolt, null);
* Define a new bolt in this topology with the specified amount of parallelism.
* @param id
* the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt
* the bolt
* @param parallelism_hint
* the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
* process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
this.bolts.put(id, bolt);
return declarer;
* Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
* kind
* of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
* @param id
* the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt
* the basic bolt
* @return use the returned object to declare the inputs to this component
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
return this.setBolt(id, bolt, null);
* Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
* kind
* of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
* @param id
* the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt
* the basic bolt
* @param parallelism_hint
* the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
* process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
* Define a new spout in this topology.
* @param id
* the id of this component. This id is referenced by other components that want to consume this spout's
* outputs.
* @param spout
* the spout
public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
return this.setSpout(id, spout, null);
* Define a new spout in this topology with the specified parallelism. If the spout declares itself as
* non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
* @param id
* the id of this component. This id is referenced by other components that want to consume this spout's
* outputs.
* @param parallelism_hint
* the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
* process somewhere around the cluster.
* @param spout
* the spout
public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
this.spouts.put(id, spout);
return declarer;
// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
/* not implemented by Storm 0.9.4
* public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
* this.stormBuilder.setStateSpout(id, stateSpout);
* }
* public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
* this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
* }
// for internal testing purpose only
StormTopology getStormTopology() {
return this.stormTopology;
......@@ -17,15 +17,14 @@
package org.apache.flink.storm.util;
import backtype.storm.Config;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import backtype.storm.Config;
* {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
* object) for embedded Spouts and Bolts.
......@@ -17,15 +17,15 @@
package org.apache.flink.storm.util;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
* Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
* Used by {@link FlinkTopology} to split multiple declared output streams within Flink.
public final class StormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
private static final long serialVersionUID = 2553423379715401023L;
......@@ -16,15 +16,13 @@
package org.apache.flink.storm.wrappers;
import java.util.Collection;
import java.util.HashMap;
import backtype.storm.generated.StormTopology;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
......@@ -36,7 +34,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashMap;
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
......@@ -53,21 +52,33 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
private static final long serialVersionUID = -4788589118464155835L;
/** The wrapped Storm {@link IRichBolt bolt}. */
private final IRichBolt bolt;
protected final IRichBolt bolt;
/** The name of the bolt. */
private final String name;
/** Number of attributes of the bolt's output tuples per stream. */
private final HashMap<String, Integer> numberOfAttributes;
protected final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
private final Fields inputSchema;
protected final Fields inputSchema;
/** The original Storm topology. */
protected StormTopology stormTopology;
/** The topology context of the bolt */
protected transient TopologyContext topologyContext;
/** The component id of the input stream for this bolt */
protected final String inputComponentId;
/** The stream id of the input stream for this bolt */
protected final String inputStreamId;
public final static String DEFAULT_OPERATOR_ID = "defaultID";
public final static String DEFUALT_BOLT_NAME = "defaultBoltName";
* We have to use this because Operators must output
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
private transient TimestampedCollector<OUT> flinkCollector;
protected transient TimestampedCollector<OUT> flinkCollector;
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
......@@ -75,8 +86,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
* declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
......@@ -89,11 +99,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
* the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
......@@ -108,16 +116,13 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
* of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [1;25].
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range [1;25].
public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
......@@ -131,8 +136,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
* of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
......@@ -153,8 +157,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
......@@ -166,7 +169,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* [0;25].
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final String[] rawOutputs) throws IllegalArgumentException {
final String[] rawOutputs) throws IllegalArgumentException {
this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
......@@ -176,11 +179,11 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
......@@ -190,8 +193,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* [0;25].
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, null, inputSchema, rawOutputs);
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, DEFUALT_BOLT_NAME, Utils.DEFAULT_STREAM_ID, DEFAULT_OPERATOR_ID, inputSchema, rawOutputs);
......@@ -201,10 +204,10 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param name
* The name of the bolt.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param name The name of the bolt.
* @param inputStreamId The stream id of the input stream for this bolt
* @param inputComponentId The component id of the input stream for this bolt
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
......@@ -215,10 +218,13 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
public BoltWrapper(final IRichBolt bolt, final String name, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
public BoltWrapper(final IRichBolt bolt, final String name,
final String inputStreamId, final String inputComponentId,
final Fields inputSchema, final Collection<String> rawOutputs) throws IllegalArgumentException {
this.bolt = bolt;
this.name = name;
this.inputComponentId = inputComponentId;
this.inputStreamId = inputStreamId;
this.inputSchema = inputSchema;
this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
......@@ -237,7 +243,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
public void open() throws Exception {
this.flinkCollector = new TimestampedCollector<OUT>(output);
this.flinkCollector = new TimestampedCollector<>(output);
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
......@@ -252,9 +258,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
topologyContext = WrapperSetupHelper.createTopologyContext(
getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
this.bolt.prepare(stormConfig, topologyContext, stormCollector);
......@@ -267,7 +272,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
public void processElement(final StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
this.bolt.execute(new StormTuple<IN>(value, inputSchema));
this.bolt.execute(new StormTuple<>(value, inputSchema, topologyContext.getThisTaskId(), inputStreamId, inputComponentId));
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.wrappers;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
* A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
* program. In contrast to {@link BoltWrapper}, this wrapper takes two input stream as input.
public class BoltWrapperTwoInput<IN1, IN2, OUT> extends BoltWrapper<IN1, OUT> implements TwoInputStreamOperator<IN1, IN2, OUT> {
/** The schema (ie, ordered field names) of the second input stream. */
private final Fields inputSchema2;
/** The component id of the second input stream of the bolt */
private final String componentId2;
/** The stream id of the second input stream of the bolt */
private final String streamId2;
* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param boltId The name of the bolt.
* @param streamId1 The stream id of the second input stream for this bolt
* @param componentId2 The component id of the second input stream for this bolt
* @param inputSchema1
* The schema (ie, ordered field names) of the input stream.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* */
public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
final String streamId1, final String streamId2,
final String componentId1, final String componentId2,
final Fields inputSchema1, final Fields inputSchema2) throws IllegalArgumentException {
this(bolt, boltId, streamId1, streamId2, componentId1, componentId2, inputSchema1, inputSchema2, null);
* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param boltId The name of the bolt.
* @param streamId1 The stream id of the first input stream for this bolt
* @param streamId2 The stream id of the first input stream for this bolt
* @param componentId1 The component id of the first input stream for this bolt
* @param componentId2 The component id of the second input stream for this bolt
* @param inputSchema1
* The schema (ie, ordered field names) of the first input stream.
* @param inputSchema2
* The schema (ie, ordered field names) of the second input stream.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
final String streamId1, final String streamId2,
final String componentId1, final String componentId2,
final Fields inputSchema1, final Fields inputSchema2,
final Collection<String> rawOutputs) throws IllegalArgumentException {
super(bolt, boltId, streamId1, componentId1, inputSchema1, rawOutputs);
this.componentId2 = componentId2;
this.streamId2 = streamId2;
this.inputSchema2 = inputSchema2;
* Sets the original Storm topology.
* @param stormTopology
* The original Storm topology.
public void setStormTopology(StormTopology stormTopology) {
this.stormTopology = stormTopology;
public void processElement1(final StreamRecord<IN1> element) throws Exception {
public void processElement2(StreamRecord<IN2> element) throws Exception {
IN2 value = element.getValue();
this.bolt.execute(new StormTuple<>(value, inputSchema2, topologyContext.getThisTaskId(), streamId2, componentId2));
public void processWatermark1(Watermark mark) throws Exception {
public void processWatermark2(Watermark mark) throws Exception {
......@@ -27,13 +27,12 @@ import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.state.ISubscribedState;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import clojure.lang.Atom;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import clojure.lang.Atom;
* {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
* a Storm topology is executed within Flink.
......@@ -17,12 +17,12 @@
package org.apache.flink.storm.wrappers;
import java.util.HashMap;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.util.HashMap;
* {@link SetupOutputFieldsDeclarer} is used by {@link WrapperSetupHelper} to determine the output streams and
* number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
......@@ -18,7 +18,6 @@
package org.apache.flink.storm.wrappers;
import backtype.storm.spout.ISpoutOutputCollector;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
......@@ -27,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
* A {@link SpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm
* A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm
* compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
* Flink tuples and emits them via the provide {@link SourceContext} object.
......@@ -17,13 +17,10 @@
package org.apache.flink.storm.wrappers;
import java.util.Collection;
import java.util.HashMap;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
......@@ -33,7 +30,8 @@ import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashMap;
* A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
......@@ -29,6 +29,8 @@ import backtype.storm.generated.GlobalStreamId;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.google.common.base.Preconditions;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
......@@ -44,16 +46,32 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
/** The task id where this tuple is processed */
private final int taskId;
/** The producer of this tuple */
private final String producerStreamId;
/** The producer's component id of this tuple */
private final String producerComponentId;
/** The message that is associated with this tuple */
private final MessageId id;
* Constructor which sets defaults for producerComponentId, taskId, and componentID
* @param flinkTuple the Flink tuple
* @param schema The schema of the storm fields
StormTuple(final IN flinkTuple, final Fields schema) {
this(flinkTuple, schema, -1, Utils.DEFAULT_STREAM_ID, BoltWrapper.DEFAULT_OPERATOR_ID);
* Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input
* types.
* @param flinkTuple
* The Flink tuple to be converted.
* @param schema
* The schema (ie, ordered field names) of the tuple.
* @param flinkTuple The Flink tuple to be converted.
* @param schema The schema (ie, ordered field names) of the tuple.
* @param producerComponentId The component id of the producer.
StormTuple(final IN flinkTuple, final Fields schema) {
StormTuple(final IN flinkTuple, final Fields schema, int taskId, String producerStreamId, String producerComponentId) {
if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
this.schema = schema;
final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
......@@ -67,6 +85,11 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
this.schema = null;
this.stormTuple = new Values(flinkTuple);
this.taskId = Preconditions.checkNotNull(taskId);
this.producerStreamId = Preconditions.checkNotNull(producerStreamId);
this.producerComponentId = Preconditions.checkNotNull(producerComponentId);
this.id = Preconditions.checkNotNull(MessageId.makeUnanchored());
......@@ -266,32 +289,27 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
public GlobalStreamId getSourceGlobalStreamid() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
return new GlobalStreamId(getSourceComponent(), producerStreamId);
public String getSourceComponent() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
return producerComponentId;
public int getSourceTask() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
return taskId;
public String getSourceStreamId() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
return producerStreamId;
public MessageId getMessageId() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
return id;
......@@ -324,4 +342,8 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
return true;
public String toString() {
return "StormTuple{ "+ stormTuple.toString() +" }";
......@@ -28,10 +28,8 @@ import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import clojure.lang.Atom;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.util.ArrayList;
import java.util.Collection;
......@@ -41,7 +39,7 @@ import java.util.Map;
import java.util.Map.Entry;
* {@link WrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} and
* {@link WrapperSetupHelper} is an helper class used by {@link SpoutWrapper} and
* {@link BoltWrapper}.
class WrapperSetupHelper {
......@@ -50,7 +48,7 @@ class WrapperSetupHelper {
final static String TOPOLOGY_NAME = "storm.topology.name";
* Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link BoltWrapper}
* Computes the number of output attributes used by a {@link SpoutWrapper} or {@link BoltWrapper}
* per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for
* output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to
* {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}.
......@@ -93,7 +91,7 @@ class WrapperSetupHelper {
return declarer.outputSchemas;
/** Used to computed unique task IDs for a Storm topology. */
/** Used to compute unique task IDs for a Storm topology. */
private static int tid;
......@@ -113,6 +111,7 @@ class WrapperSetupHelper {
static synchronized TopologyContext createTopologyContext(
final StreamingRuntimeContext context, final IComponent spoutOrBolt,
final String operatorName, StormTopology stormTopology, final Map stormConfig) {
final int dop = context.getNumberOfParallelSubtasks();
final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
......@@ -158,7 +157,7 @@ class WrapperSetupHelper {
componentToStreamToFields.put(operatorName, declarer.outputStreams);
} else {
// whole topology is built (ie, FlinkTopologyBuilder is used)
// whole topology is built (i.e. FlinkTopology is used)
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
Map<String, Bolt> bolts = stormTopology.get_bolts();
Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts();
......@@ -182,14 +181,15 @@ class WrapperSetupHelper {
for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) {
Integer rc = taskId = processSingleOperator(stateSpout.getKey(), stateSpout
Integer rc = processSingleOperator(stateSpout.getKey(), stateSpout
.getValue().get_common(), operatorName, context.getIndexOfThisSubtask(),
dop, taskToComponents, componentToSortedTasks, componentToStreamToFields);
if (rc != null) {
taskId = rc;
assert (taskId != null);
assert(taskId != null);
if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
......@@ -224,7 +224,7 @@ class WrapperSetupHelper {
* OUTPUT: A map from all component IDs to there output streams and output fields.
* @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current
* Flink operator ({@link operatorName}) -- {@code null} otherwise.
* Flink operator {@code operatorName} -- {@code null} otherwise.
private static Integer processSingleOperator(final String componentId,
final ComponentCommon common, final String operatorName, final int index,
......@@ -246,9 +246,6 @@ class WrapperSetupHelper {
componentToSortedTasks.put(componentId, sortedTasks);
if (componentId.equals(operatorName)) {
Map<String, Fields> outputStreams = new HashMap<String, Fields>();
for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) {
outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields()));
......@@ -18,9 +18,7 @@ package org.apache.flink.storm.api;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
import org.apache.flink.storm.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.api;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
import org.junit.Ignore;
import org.junit.Test;
import backtype.storm.tuple.Fields;
public class FlinkTopologyBuilderTest {
@Test(expected = RuntimeException.class)
public void testUnknowSpout() {
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
@Test(expected = RuntimeException.class)
public void testUnknowBolt() {
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
@Test(expected = RuntimeException.class)
public void testUndeclaredStream() {
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
flinkBuilder.setSpout("spout", new TestDummySpout());
flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
TestDummySpout.spoutStreamId, new Fields("id"));
public void testFieldsGroupingOnMultipleBoltOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
flinkBuilder.setSpout("spout", new TestDummySpout());
flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
TestDummyBolt.groupingStreamId, new Fields("id"));
......@@ -14,50 +14,77 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.api;
import org.apache.flink.storm.api.FlinkTopology;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class FlinkTopologyTest {
public void testDefaultParallelism() {
final FlinkTopology topology = new FlinkTopology();
Assert.assertEquals(1, topology.getParallelism());
final TopologyBuilder builder = new TopologyBuilder();
final FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);
Assert.assertEquals(1, flinkTopology.getExecutionEnvironment().getParallelism());
@Test(expected = UnsupportedOperationException.class)
public void testExecute() throws Exception {
new FlinkTopology().execute();
@Test(expected = RuntimeException.class)
public void testUnknowSpout() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
@Test(expected = UnsupportedOperationException.class)
public void testExecuteWithName() throws Exception {
new FlinkTopology().execute(null);
@Test(expected = RuntimeException.class)
public void testUnknowBolt() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
public void testNumberOfTasks() {
final FlinkTopology topology = new FlinkTopology();
@Test(expected = RuntimeException.class)
public void testUndeclaredStream() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
Assert.assertEquals(0, topology.getNumberOfTasks());
Assert.assertEquals(3, topology.getNumberOfTasks());
public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
TopologyBuilder builder = new TopologyBuilder();
Assert.assertEquals(5, topology.getNumberOfTasks());
builder.setSpout("spout", new TestDummySpout());
builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
TestDummySpout.spoutStreamId, new Fields("id"));
Assert.assertEquals(13, topology.getNumberOfTasks());
@Test(expected = AssertionError.class)
public void testAssert() {
new FlinkTopology().increaseNumberOfTasks(0);
public void testFieldsGroupingOnMultipleBoltOutputStreams() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestDummySpout());
builder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
builder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
TestDummyBolt.groupingStreamId, new Fields("id"));
......@@ -16,14 +16,14 @@
package org.apache.flink.storm.api;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.Map;
public class TestBolt implements IRichBolt {
private static final long serialVersionUID = -667148827441397683L;
......@@ -16,13 +16,13 @@
package org.apache.flink.storm.api;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
public class TestSpout implements IRichSpout {
private static final long serialVersionUID = -4884029383198924007L;
......@@ -17,13 +17,11 @@
package org.apache.flink.storm.util;
import java.util.Iterator;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.junit.Assert;
import org.junit.Test;
import java.util.Iterator;
public class StormStreamSelectorTest {
......@@ -16,8 +16,6 @@
package org.apache.flink.storm.util;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
......@@ -26,6 +24,8 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class TestDummyBolt implements IRichBolt {
private static final long serialVersionUID = 6893611247443121322L;
......@@ -16,8 +16,6 @@
package org.apache.flink.storm.util;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
......@@ -26,6 +24,8 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
public class TestDummySpout implements IRichSpout {
private static final long serialVersionUID = -5190945609124603118L;
......@@ -16,16 +16,16 @@
package org.apache.flink.storm.util;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class TestSink implements IRichBolt {
private static final long serialVersionUID = 4314871456719370877L;
......@@ -18,10 +18,8 @@
package org.apache.flink.storm.wrappers;
import backtype.storm.tuple.Values;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.wrappers.BoltCollector;
import org.apache.flink.streaming.api.operators.Output;
import org.junit.Assert;
import org.junit.Test;
......@@ -24,7 +24,6 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
......@@ -37,9 +36,9 @@ import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;
......@@ -54,6 +53,7 @@ import java.util.Map.Entry;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
......@@ -70,8 +70,7 @@ public class BoltWrapperTest extends AbstractTest {
declarer.declare(new Fields("dummy1", "dummy2"));
new BoltWrapper<Object, Object>(mock(IRichBolt.class),
new String[] { Utils.DEFAULT_STREAM_ID });
new BoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] { Utils.DEFAULT_STREAM_ID });
@Test(expected = IllegalArgumentException.class)
......@@ -178,7 +177,7 @@ public class BoltWrapperTest extends AbstractTest {
final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw);
final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw);
wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
......@@ -270,7 +269,7 @@ public class BoltWrapperTest extends AbstractTest {
wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class));
......@@ -17,8 +17,6 @@
package org.apache.flink.storm.wrappers;
import java.util.HashMap;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StateSpoutSpec;
......@@ -26,11 +24,11 @@ import backtype.storm.generated.StormTopology;
import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.IReducer;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.wrappers.FlinkTopologyContext;
import org.junit.Test;
import java.util.HashMap;
* FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
......@@ -19,9 +19,7 @@ package org.apache.flink.storm.wrappers;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
import org.junit.Assert;
import org.junit.Test;
......@@ -18,10 +18,8 @@
package org.apache.flink.storm.wrappers;
import backtype.storm.tuple.Values;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.wrappers.SpoutCollector;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.junit.Assert;
import org.junit.Test;
......@@ -21,7 +21,6 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
......@@ -17,17 +17,17 @@
package org.apache.flink.storm.wrappers;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.wrappers.StormTuple;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
......@@ -613,29 +613,38 @@ public class StormTupleTest extends AbstractTest {
return new StormTuple(tuple, schema);
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceGlobalStreamid() {
new StormTuple<Object>(null, null).getSourceGlobalStreamid();
GlobalStreamId globalStreamid =
new StormTuple<>(new Tuple1(), null, 0, "streamId", "componentID").getSourceGlobalStreamid();
Assert.assertEquals("streamId", globalStreamid.get_streamId());
Assert.assertEquals("componentID", globalStreamid.get_componentId());
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceComponent() {
new StormTuple<Object>(null, null).getSourceComponent();
String sourceComponent =
new StormTuple<>(new Tuple1(), null, 0, "streamId", "componentID").getSourceComponent();
Assert.assertEquals("componentID", sourceComponent);
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceTask() {
new StormTuple<Object>(null, null).getSourceTask();
String sourceStreamId =
new StormTuple<>(new Tuple1(), null, 0, "streamId", "componentID").getSourceStreamId();
Assert.assertEquals("streamId", sourceStreamId);
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceStreamId() {
new StormTuple<Object>(null, null).getSourceStreamId();
String sourceStreamId =
new StormTuple<>(new Tuple1(), null, 0, "streamId", "componentID").getSourceStreamId();
Assert.assertEquals("streamId", sourceStreamId);
@Test(expected = UnsupportedOperationException.class)
public void testGetMessageId() {
new StormTuple<Object>(null, null).getMessageId();
Assert.assertNotNull(new StormTuple<>(null, null).getMessageId());
public static class TestPojoMember<T> {
......@@ -16,12 +16,6 @@
package org.apache.flink.storm.wrappers;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.ComponentCommon;
......@@ -33,8 +27,8 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.TestTopologyBuilder;
import com.google.common.collect.Sets;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
......@@ -48,7 +42,11 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
......@@ -180,8 +178,6 @@ public class WrapperSetupHelperTest extends AbstractTest {
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
int counter = 0;
......@@ -193,24 +189,22 @@ public class WrapperSetupHelperTest extends AbstractTest {
Utils.sleep(++counter * 10000);
if (TestSink.result.size() == 8) {
if (TestSink.result.size() == 5) {
TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
TopologyBuilder stormBuilder = new TopologyBuilder();
flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
stormBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
stormBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
stormBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
stormBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
stormBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
StormTopology stormTopology = flinkBuilder.getStormTopology();
Set<Integer> taskIds = new HashSet<Integer>();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册