提交 f7b113d9 编写于 作者: M mjsax

[FLINK-2837] [Storm Compatibility] FlinkTopologyBuilder cannot handle multiple input streams

- removed BoltWrapperTwoInputs
- add generic multi-input stream bolt wrapper
- updated FlinkTopology to translate multi-input bolts correctly
- added multi-input union test wrapper
- updated FlinkTopology to translate multi-input bolts correctly
- added multi-input union test
上级 8e95214b
......@@ -49,7 +49,7 @@ public class SingleJoinExample {
// emit result
if (args.length > 0) {
// read the text file from given input path
// read the text file from given input path
builder.setBolt("fileOutput", new BoltFileSink(args[0], new TupleOutputFormatter()))
} else {
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.tests;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.MergerBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class StormUnionITCase extends StreamingProgramTestBase {
private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n"
+ "431529176\n" + "1260042744\n" + "1761283695\n" + "1749940626\n" + "892128508\n"
+ "155629808\n" + "1429008869\n" + "-1465154083\n" + "-723955400\n" + "-423279216\n"
+ "17850135\n" + "2133836778\n" + "1033096058\n" + "-1690734402\n" + "-1557280266\n"
+ "1327362106\n" + "-1930858313\n" + "502539523\n" + "-1728529858\n" + "-938301587\n"
+ "-624140595\n" + "-60658084\n" + "142959438\n" + "-613647601\n" + "-330177159\n"
+ "-54027108\n" + "1945002173\n" + "979930868";
private final static String topologyId = "Multiple Input Streams Test";
private final static String spoutId1 = "spout1";
private final static String spoutId2 = "spout2";
private final static String spoutId3 = "spout3";
private final static String boltId = "merger";
private final static String sinkId = "sink";
private String resultPath;
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(RESULT, this.resultPath);
protected void testProgram() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
// get input data
builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
builder.setBolt(boltId, new MergerBolt())
.shuffleGrouping(spoutId1, FiniteRandomSpout.STREAM_PREFIX + 0)
.shuffleGrouping(spoutId2, FiniteRandomSpout.STREAM_PREFIX + 0)
.shuffleGrouping(spoutId3, FiniteRandomSpout.STREAM_PREFIX + 0);
final String[] tokens = this.resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);
// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.tests.operators;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
public class MergerBolt extends BaseRichBolt {
private static final long serialVersionUID = -7966475984592762720L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
public void execute(Tuple input) {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
......@@ -27,7 +27,7 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
......@@ -36,8 +36,9 @@ import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
import org.apache.flink.storm.wrappers.MergedInputsBoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
......@@ -139,7 +140,7 @@ public class FlinkTopology {
return InstantiationUtil.deserializeObject(
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Failed to copy object.");
......@@ -218,13 +219,13 @@ public class FlinkTopology {
* 1. Connect all spout streams with bolts streams
* 2. Then proceed with the bolts stream already connected
* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
* its producer
* ->thus, we might need to repeat multiple times
* 1. Connect all spout streams with bolts streams
* 2. Then proceed with the bolts stream already connected
* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
* its producer
* ->thus, we might need to repeat multiple times
boolean makeProgress = true;
while (bolts.size() > 0) {
if (!makeProgress) {
......@@ -283,28 +284,8 @@ public class FlinkTopology {
inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
final Iterator<Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
final Entry<GlobalStreamId, DataStream<Tuple>> firstInput = iterator.next();
GlobalStreamId streamId = firstInput.getKey();
DataStream<Tuple> inputStream = firstInput.getValue();
final SingleOutputStreamOperator<?, ?> outputStream;
switch (numberOfInputs) {
case 1:
outputStream = createOutput(boltId, userBolt, streamId, inputStream);
case 2:
Entry<GlobalStreamId, DataStream<Tuple>> secondInput = iterator.next();
GlobalStreamId streamId2 = secondInput.getKey();
DataStream<Tuple> inputStream2 = secondInput.getValue();
outputStream = createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2);
throw new UnsupportedOperationException("Don't know how to translate a bolt "
+ boltId + " with " + numberOfInputs + " inputs.");
final SingleOutputStreamOperator<?, ?> outputStream = createOutput(boltId,
userBolt, inputStreams);
if (common.is_set_parallelism_hint()) {
int dop = common.get_parallelism_hint();
......@@ -318,14 +299,14 @@ public class FlinkTopology {
private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt,
GlobalStreamId streamId, Grouping grouping,
Map<String, DataStream<Tuple>> producer) {
GlobalStreamId streamId, Grouping grouping,
Map<String, DataStream<Tuple>> producer) {
assert (userBolt != null);
assert(boltId != null);
assert(streamId != null);
assert(grouping != null);
assert(producer != null);
assert (boltId != null);
assert (streamId != null);
assert (grouping != null);
assert (producer != null);
final String producerId = streamId.get_componentId();
final String inputStreamId = streamId.get_streamId();
......@@ -362,24 +343,50 @@ public class FlinkTopology {
return inputStream;
private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt, GlobalStreamId streamId, DataStream<Tuple> inputStream) {
return createOutput(boltId, bolt, streamId, inputStream, null, null);
@SuppressWarnings({ "unchecked", "rawtypes" })
private SingleOutputStreamOperator<?, ?> createOutput(String boltId, IRichBolt bolt,
GlobalStreamId streamId, DataStream<Tuple> inputStream,
GlobalStreamId streamId2, DataStream<Tuple> inputStream2) {
assert(boltId != null);
assert(streamId != null);
assert(inputStream != null);
Preconditions.checkArgument((streamId2 == null) == (inputStream2 == null));
String producerId = streamId.get_componentId();
String inputStreamId = streamId.get_streamId();
Map<GlobalStreamId, DataStream<Tuple>> inputStreams) {
assert (boltId != null);
assert (bolt != null);
assert (inputStreams != null);
Iterator<Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet()
Entry<GlobalStreamId, DataStream<Tuple>> input1 = iterator.next();
GlobalStreamId streamId1 = input1.getKey();
String inputStreamId1 = streamId1.get_streamId();
String inputComponentId1 = streamId1.get_componentId();
Fields inputSchema1 = this.outputStreams.get(inputComponentId1).get(inputStreamId1);
DataStream<Tuple> singleInputStream = input1.getValue();
DataStream<StormTuple<Tuple>> mergedInputStream = null;
while (iterator.hasNext()) {
Entry<GlobalStreamId, DataStream<Tuple>> input2 = iterator.next();
GlobalStreamId streamId2 = input2.getKey();
DataStream<Tuple> inputStream2 = input2.getValue();
if (mergedInputStream == null) {
mergedInputStream = singleInputStream
new TwoFlinkStreamsMerger(streamId1, inputSchema1,
streamId2, this.outputStreams.get(
} else {
mergedInputStream = mergedInputStream
new StormFlinkStreamMerger(streamId2, this.outputStreams.get(
final HashMap<String, Fields> boltOutputs = this.outputStreams.get(boltId);
final FlinkOutputFieldsDeclarer declarer = declarers.get(boltId);
final FlinkOutputFieldsDeclarer declarer = this.declarers.get(boltId);
final SingleOutputStreamOperator<?, ?> outputStream;
......@@ -391,34 +398,21 @@ public class FlinkTopology {
outputStreamId = null;
final TypeInformation<Tuple> outType = declarer
final TypeInformation<Tuple> outType = declarer.getOutputType(outputStreamId);
final SingleOutputStreamOperator<Tuple, ?> outStream;
// only one input
if (streamId2 == null) {
BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>(
bolt, boltId, producerId, inputStreamId,
this.outputStreams.get(producerId).get(inputStreamId), null);
if(inputStreams.entrySet().size() == 1) {
BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>(bolt, boltId,
inputStreamId1, inputComponentId1, inputSchema1, null);
outStream = inputStream.transform(boltId, outType, boltWrapper);
outStream = singleInputStream.transform(boltId, outType, boltWrapper);
} else {
String producerId2 = streamId2.get_componentId();
String inputStreamId2 = streamId2.get_streamId();
final BoltWrapperTwoInput<Tuple, Tuple, Tuple> boltWrapper = new BoltWrapperTwoInput<>(
bolt, boltId,
inputStreamId, inputStreamId2, producerId, producerId2,
MergedInputsBoltWrapper<Tuple, Tuple> boltWrapper = new MergedInputsBoltWrapper<Tuple, Tuple>(
bolt, boltId, null);
outStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
outStream = mergedInputStream.transform(boltId, outType, boltWrapper);
if (outType != null) {
......@@ -429,36 +423,22 @@ public class FlinkTopology {
outputStream = outStream;
} else {
@SuppressWarnings({ "unchecked", "rawtypes" })
final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream;
// only one input
if (streamId2 == null) {
if(inputStreams.entrySet().size() == 1) {
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(
bolt, boltId, inputStreamId, producerId, this.outputStreams.get(producerId).get(inputStreamId),
bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);
multiStream = inputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
multiStream = singleInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
} else {
String producerId2 = streamId2.get_componentId();
String inputStreamId2 = streamId2.get_streamId();
final BoltWrapperTwoInput<Tuple, Tuple, SplitStreamType<Tuple>> boltWrapper = new BoltWrapperTwoInput<>(
bolt, boltId,
inputStreamId, inputStreamId2, producerId, producerId2,
multiStream = inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
final MergedInputsBoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new MergedInputsBoltWrapper<Tuple, SplitStreamType<Tuple>>(
bolt, boltId, null);
multiStream = mergedInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
* /* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.flink.storm.api;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.tuple.Fields;
* Merges a stream of type {@link StormTuple} with a Flink {@link DataStreams} into a stream of type {@link StormTuple}.
final class StormFlinkStreamMerger<IN1, IN2> implements CoFlatMapFunction<StormTuple<IN1>, IN2, StormTuple> {
private static final long serialVersionUID = -914164633830563631L;
private final String inputStreamId;
private final String inputComponentId;
private final Fields inputSchema;
public StormFlinkStreamMerger(GlobalStreamId streamId, Fields schema) {
this.inputStreamId = streamId.get_streamId();
this.inputComponentId = streamId.get_componentId();
this.inputSchema = schema;
public void flatMap1(StormTuple<IN1> value, Collector<StormTuple> out) throws Exception {
public void flatMap2(IN2 value, Collector<StormTuple> out) throws Exception {
out.collect(new StormTuple<IN2>(value, this.inputSchema, 0, this.inputStreamId,
\ No newline at end of file
* /* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.flink.storm.api;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.tuple.Fields;
* Merges two Flink {@link DataStreams} into a stream of type {@link StormTuple}.
final class TwoFlinkStreamsMerger<IN1, IN2> implements CoFlatMapFunction<IN1, IN2, StormTuple> {
private static final long serialVersionUID = -495174165824062256L;
private final String inputStreamId1;
private final String inputComponentId1;
private final Fields inputSchema1;
private final String inputStreamId2;
private final String inputComponentId2;
private final Fields inputSchema2;
public TwoFlinkStreamsMerger(GlobalStreamId streamId1, Fields schema1, GlobalStreamId streamId2,
Fields schema2) {
this.inputStreamId1 = streamId1.get_streamId();
this.inputComponentId1 = streamId1.get_componentId();
this.inputSchema1 = schema1;
this.inputStreamId2 = streamId2.get_streamId();
this.inputComponentId2 = streamId2.get_componentId();
this.inputSchema2 = schema2;
public void flatMap1(IN1 value, Collector<StormTuple> out) throws Exception {
out.collect(new StormTuple<IN1>(value, this.inputSchema1, 0,
this.inputStreamId1, this.inputComponentId1));
public void flatMap2(IN2 value, Collector<StormTuple> out) throws Exception {
out.collect(new StormTuple<IN2>(value, this.inputSchema2, 0,
this.inputStreamId2, this.inputComponentId2));
\ No newline at end of file
......@@ -38,55 +38,54 @@ import java.util.Collection;
import java.util.HashMap;
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
* program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the
* bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type
* {@code OUT} (see {@link AbstractStormCollector} for supported types).<br>
* <br>
* <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration
* <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method.
* Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so
* far.</strong>
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
* It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
* process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type {@code OUT}
* (see {@link AbstractStormCollector} for supported types).<br/>
* <br/>
* <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream
* Bolts.</strong>
public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = -4788589118464155835L;
/** The default input component ID. */
public final static String DEFAULT_ID = "default ID";
/** The default bolt ID. */
public final static String DEFUALT_BOLT_NAME = "Unnamed Bolt";
/** The wrapped Storm {@link IRichBolt bolt}. */
protected final IRichBolt bolt;
/** The name of the bolt. */
private final String name;
/** Number of attributes of the bolt's output tuples per stream. */
protected final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
protected final Fields inputSchema;
/** The original Storm topology. */
protected StormTopology stormTopology;
/** The topology context of the bolt */
protected transient TopologyContext topologyContext;
private final HashMap<String, Integer> numberOfAttributes;
/** The component id of the input stream for this bolt */
protected final String inputComponentId;
/** The stream id of the input stream for this bolt */
protected final String inputStreamId;
/** The original Storm topology. */
private StormTopology stormTopology;
/** The topology context of the bolt. */
private transient TopologyContext topologyContext;
public final static String DEFAULT_OPERATOR_ID = "defaultID";
public final static String DEFUALT_BOLT_NAME = "defaultBoltName";
/** The stream ID of the input stream for this bolt. */
private final String inputStreamId;
/** The component ID of the input stream for this bolt. */
private final String inputComponentId;
/** The schema (ie, ordered field names) of the input stream. */
private final Fields inputSchema;
* We have to use this because Operators must output
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
* We have to use this because Operators must output {@link StreamRecord}.
protected transient TimestampedCollector<OUT> flinkCollector;
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible for
* POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
* declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
......@@ -95,13 +94,15 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
* the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
......@@ -111,18 +112,21 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
* of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible for
* POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the bolt's
* number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of
* {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range [1;25].
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range
* [1;25].
public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
......@@ -130,13 +134,14 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
* of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible for
* POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the bolt's
* number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of
* {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
......@@ -151,13 +156,14 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
......@@ -168,22 +174,24 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final String[] rawOutputs) throws IllegalArgumentException {
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs)
throws IllegalArgumentException {
this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* The schema (ie, ordered field names) of the input stream. @throws IllegalArgumentException If
* {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
......@@ -193,8 +201,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* [0;25].
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, DEFUALT_BOLT_NAME, Utils.DEFAULT_STREAM_ID, DEFAULT_OPERATOR_ID, inputSchema, rawOutputs);
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, DEFUALT_BOLT_NAME, Utils.DEFAULT_STREAM_ID, DEFAULT_ID, inputSchema, rawOutputs);
......@@ -204,10 +212,14 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param name The name of the bolt.
* @param inputStreamId The stream id of the input stream for this bolt
* @param inputComponentId The component id of the input stream for this bolt
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param name
* The name of the bolt.
* @param inputStreamId
* The stream id of the input stream for this bolt
* @param inputComponentId
* The component id of the input stream for this bolt
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
......@@ -218,9 +230,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
public BoltWrapper(final IRichBolt bolt, final String name,
final String inputStreamId, final String inputComponentId,
final Fields inputSchema, final Collection<String> rawOutputs) throws IllegalArgumentException {
public BoltWrapper(final IRichBolt bolt, final String name, final String inputStreamId,
final String inputComponentId, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this.bolt = bolt;
this.name = name;
this.inputComponentId = inputComponentId;
......@@ -243,9 +255,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
public void open() throws Exception {
this.flinkCollector = new TimestampedCollector<>(output);
this.flinkCollector = new TimestampedCollector<>(this.output);
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
this.numberOfAttributes, this.flinkCollector));
GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
......@@ -258,9 +270,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
topologyContext = WrapperSetupHelper.createTopologyContext(
this.topologyContext = WrapperSetupHelper.createTopologyContext(
getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
this.bolt.prepare(stormConfig, topologyContext, stormCollector);
this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
......@@ -272,7 +284,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
public void processElement(final StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
this.bolt.execute(new StormTuple<>(value, inputSchema, topologyContext.getThisTaskId(), inputStreamId, inputComponentId));
this.bolt.execute(new StormTuple<>(value, this.inputSchema, this.topologyContext
.getThisTaskId(), this.inputStreamId, this.inputComponentId));
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.wrappers;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
* A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
* program. In contrast to {@link BoltWrapper}, this wrapper takes two input stream as input.
public class BoltWrapperTwoInput<IN1, IN2, OUT> extends BoltWrapper<IN1, OUT> implements TwoInputStreamOperator<IN1, IN2, OUT> {
/** The schema (ie, ordered field names) of the second input stream. */
private final Fields inputSchema2;
/** The component id of the second input stream of the bolt */
private final String componentId2;
/** The stream id of the second input stream of the bolt */
private final String streamId2;
* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param boltId The name of the bolt.
* @param streamId1 The stream id of the second input stream for this bolt
* @param componentId2 The component id of the second input stream for this bolt
* @param inputSchema1
* The schema (ie, ordered field names) of the input stream.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* */
public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
final String streamId1, final String streamId2,
final String componentId1, final String componentId2,
final Fields inputSchema1, final Fields inputSchema2) throws IllegalArgumentException {
this(bolt, boltId, streamId1, streamId2, componentId1, componentId2, inputSchema1, inputSchema2, null);
* Instantiates a new {@link BoltWrapperTwoInput} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
* @param bolt The Storm {@link IRichBolt bolt} to be used.
* @param boltId The name of the bolt.
* @param streamId1 The stream id of the first input stream for this bolt
* @param streamId2 The stream id of the first input stream for this bolt
* @param componentId1 The component id of the first input stream for this bolt
* @param componentId2 The component id of the second input stream for this bolt
* @param inputSchema1
* The schema (ie, ordered field names) of the first input stream.
* @param inputSchema2
* The schema (ie, ordered field names) of the second input stream.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
final String streamId1, final String streamId2,
final String componentId1, final String componentId2,
final Fields inputSchema1, final Fields inputSchema2,
final Collection<String> rawOutputs) throws IllegalArgumentException {
super(bolt, boltId, streamId1, componentId1, inputSchema1, rawOutputs);
this.componentId2 = componentId2;
this.streamId2 = streamId2;
this.inputSchema2 = inputSchema2;
* Sets the original Storm topology.
* @param stormTopology
* The original Storm topology.
public void setStormTopology(StormTopology stormTopology) {
this.stormTopology = stormTopology;
public void processElement1(final StreamRecord<IN1> element) throws Exception {
public void processElement2(StreamRecord<IN2> element) throws Exception {
IN2 value = element.getValue();
this.bolt.execute(new StormTuple<>(value, inputSchema2, topologyContext.getThisTaskId(), streamId2, componentId2));
public void processWatermark1(Watermark mark) throws Exception {
public void processWatermark2(Watermark mark) throws Exception {
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.storm.wrappers;
import backtype.storm.topology.IRichBolt;
import com.google.common.collect.Sets;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
* A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It
* can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream
* already.
public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTuple<IN>, OUT> {
private static final long serialVersionUID = 6399319187892878545L;
* Instantiates a new {@link MergedInputsBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
* can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to {@link Tuple25}
* depending on the bolt's declared number of attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
public MergedInputsBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
* Instantiates a new {@link MergedInputsBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
* can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of
* attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range
* [1;25].
public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
super(bolt, Sets.newHashSet(rawOutputs));
* Instantiates a new {@link MergedInputsBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
* can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of
* attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [1;25].
public MergedInputsBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
throws IllegalArgumentException {
super(bolt, rawOutputs);
* Instantiates a new {@link MergedInputsBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
* can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of
* attributes.
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param name
* The name of the bolt.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
public MergedInputsBoltWrapper(final IRichBolt bolt, final String name, final Collection<String> rawOutputs)
throws IllegalArgumentException {
super(bolt, name, null, null, null, rawOutputs);
public void processElement(final StreamRecord<StormTuple<IN>> element) throws Exception {
......@@ -39,7 +39,7 @@ import java.util.List;
* {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
class StormTuple<IN> implements backtype.storm.tuple.Tuple {
public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
/** The Storm representation of the original Flink tuple */
private final Values stormTuple;
......@@ -60,8 +60,8 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
* @param flinkTuple the Flink tuple
* @param schema The schema of the storm fields
StormTuple(final IN flinkTuple, final Fields schema) {
this(flinkTuple, schema, -1, Utils.DEFAULT_STREAM_ID, BoltWrapper.DEFAULT_OPERATOR_ID);
public StormTuple(final IN flinkTuple, final Fields schema) {
this(flinkTuple, schema, -1, Utils.DEFAULT_STREAM_ID, BoltWrapper.DEFAULT_ID);
......@@ -71,7 +71,7 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
* @param schema The schema (ie, ordered field names) of the tuple.
* @param producerComponentId The component id of the producer.
StormTuple(final IN flinkTuple, final Fields schema, int taskId, String producerStreamId, String producerComponentId) {
public StormTuple(final IN flinkTuple, final Fields schema, int taskId, String producerStreamId, String producerComponentId) {
if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
this.schema = schema;
final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册