From 4c46b2feee2c1ecda21e5ba62d02dfd7df73e256 Mon Sep 17 00:00:00 2001 From: mjsax Date: Fri, 6 Nov 2015 11:51:31 +0100 Subject: [PATCH] [FLINK-2861] Fields grouping on split streams fails This closes #1387 --- .../tests/StormFieldsGroupingITCase.java | 72 ++++++++++++++++ .../tests/operators/FiniteRandomSpout.java | 86 +++++++++++++++++++ .../storm/tests/operators/TaskIdBolt.java | 53 ++++++++++++ .../flink/storm/api/FlinkTopologyBuilder.java | 26 +++--- .../flink/storm/wrappers/BoltWrapper.java | 39 +++++++-- .../flink/storm/wrappers/SpoutWrapper.java | 33 ++++++- .../storm/wrappers/WrapperSetupHelper.java | 7 +- .../flink/storm/wrappers/BoltWrapperTest.java | 3 +- .../wrappers/WrapperSetupHelperTest.java | 4 +- 9 files changed, 295 insertions(+), 28 deletions(-) create mode 100644 flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java create mode 100644 flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java create mode 100644 flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java new file mode 100644 index 00000000000..24213249101 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.tests; + +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.tests.operators.FiniteRandomSpout; +import org.apache.flink.storm.tests.operators.TaskIdBolt; +import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.storm.util.StormTestBase; + +public class StormFieldsGroupingITCase extends StormTestBase { + + private final static String topologyId = "FieldsGrouping Test"; + private final static String spoutId = "spout"; + private final static String boltId = "bolt"; + private final static String sinkId = "sink"; + private String resultPath; + + @Override + protected void preSubmit() throws Exception { + this.resultPath = this.getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n" + + "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n" + + "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath); + } + + @Override + protected void testProgram() throws Exception { + final String[] tokens = this.resultPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + + final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + + builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); + builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( + spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number")); + builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + cluster.submitTopology(topologyId, null, builder.createTopology()); + + Utils.sleep(10 * 1000); + + // TODO kill does no do anything so far + cluster.killTopology(topologyId); + cluster.shutdown(); + } + +} diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java new file mode 100644 index 00000000000..39072ebc197 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.tests.operators; + +import java.util.Map; +import java.util.Random; + +import org.apache.flink.storm.util.FiniteSpout; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +public class FiniteRandomSpout extends BaseRichSpout implements FiniteSpout { + private static final long serialVersionUID = 6592885571932363239L; + + public static final String STREAM_PREFIX = "stream_"; + + private final Random r; + private SpoutOutputCollector collector; + private int counter; + private final String[] outputStreams; + + public FiniteRandomSpout(long seed, int counter, int numberOfOutputStreams) { + this.r = new Random(seed); + this.counter = counter; + if (numberOfOutputStreams < 1) { + this.outputStreams = new String[] { Utils.DEFAULT_STREAM_ID }; + } else { + this.outputStreams = new String[numberOfOutputStreams]; + for (int i = 0; i < this.outputStreams.length; ++i) { + this.outputStreams[i] = STREAM_PREFIX + i; + } + } + } + + public FiniteRandomSpout(long seed, int counter) { + this(seed, counter, 1); + } + + @SuppressWarnings("rawtypes") + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + for (String s : this.outputStreams) { + this.collector.emit(s, new Values(this.r.nextInt())); + } + --this.counter; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (String s : this.outputStreams) { + declarer.declareStream(s, new Fields("number")); + } + } + + @Override + public boolean reachedEnd() { + return this.counter <= 0; + } + +} diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java new file mode 100644 index 00000000000..6c5bea29d95 --- /dev/null +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.tests.operators; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class TaskIdBolt extends BaseRichBolt { + private static final long serialVersionUID = -7966475984592762720L; + + private OutputCollector collector; + private int thisTaskId; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.thisTaskId = context.getThisTaskId(); + } + + @Override + public void execute(Tuple input) { + this.collector.emit(new Values(this.thisTaskId + "> " + input.getValue(0))); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("number")); + } + +} diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java index 47aa68e17d1..42e1d68f624 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java @@ -101,7 +101,7 @@ public class FlinkTopologyBuilder { final DataStreamSource source; if (sourceStreams.size() == 1) { - final SpoutWrapper spoutWrapperSingleOutput = new SpoutWrapper(userSpout); + final SpoutWrapper spoutWrapperSingleOutput = new SpoutWrapper(userSpout, spoutId, null, null); spoutWrapperSingleOutput.setStormTopology(stormTopology); final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; @@ -113,7 +113,7 @@ public class FlinkTopologyBuilder { source = src; } else { final SpoutWrapper> spoutWrapperMultipleOutputs = new SpoutWrapper>( - userSpout); + userSpout, spoutId, null, null); spoutWrapperMultipleOutputs.setStormTopology(stormTopology); @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -124,7 +124,10 @@ public class FlinkTopologyBuilder { SplitStream> splitSource = multiSource .split(new StormStreamSelector()); for (String streamId : sourceStreams.keySet()) { - outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper())); + SingleOutputStreamOperator outStream = splitSource.select(streamId) + .map(new SplitStreamMapper()); + outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); + outputStreams.put(streamId, outStream); } source = multiSource; } @@ -230,8 +233,8 @@ public class FlinkTopologyBuilder { .getOutputType(outputStreamId); final BoltWrapper boltWrapperSingleOutput = new BoltWrapper( - userBolt, this.outputStreams.get(producerId).get( - inputStreamId)); + userBolt, boltId, this.outputStreams.get(producerId).get( + inputStreamId), null); boltWrapperSingleOutput.setStormTopology(stormTopology); final SingleOutputStreamOperator outStream = inputStream @@ -246,8 +249,8 @@ public class FlinkTopologyBuilder { outputStream = outStream; } else { final BoltWrapper> boltWrapperMultipleOutputs = new BoltWrapper>( - userBolt, this.outputStreams.get(producerId).get( - inputStreamId)); + userBolt, boltId, this.outputStreams.get(producerId).get( + inputStreamId), null); boltWrapperMultipleOutputs.setStormTopology(stormTopology); @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -262,9 +265,12 @@ public class FlinkTopologyBuilder { final HashMap> op = new HashMap>(); for (String outputStreamId : boltOutputStreams.keySet()) { - op.put(outputStreamId, - splitStream.select(outputStreamId).map( - new SplitStreamMapper())); + SingleOutputStreamOperator outStream = splitStream + .select(outputStreamId).map( + new SplitStreamMapper()); + outStream.getTransformation().setOutputType( + declarer.getOutputType(outputStreamId)); + op.put(outputStreamId, outStream); } availableInputs.put(boltId, op); outputStream = multiStream; diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index d9a2f9196fa..ee06f0ac6e3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -54,6 +54,8 @@ public class BoltWrapper extends AbstractStreamOperator implements /** The wrapped Storm {@link IRichBolt bolt}. */ private final IRichBolt bolt; + /** The name of the bolt. */ + private final String name; /** Number of attributes of the bolt's output tuples per stream. */ private final HashMap numberOfAttributes; /** The schema (ie, ordered field names) of the input stream. */ @@ -189,7 +191,34 @@ public class BoltWrapper extends AbstractStreamOperator implements */ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final Collection rawOutputs) throws IllegalArgumentException { + this(bolt, null, inputSchema, rawOutputs); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used + * within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} + * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will + * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param name + * The name of the bolt. + * @param inputSchema + * The schema (ie, ordered field names) of the input stream. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public BoltWrapper(final IRichBolt bolt, final String name, final Fields inputSchema, + final Collection rawOutputs) throws IllegalArgumentException { this.bolt = bolt; + this.name = name; this.inputSchema = inputSchema; this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs); } @@ -209,12 +238,8 @@ public class BoltWrapper extends AbstractStreamOperator implements super.open(); this.flinkCollector = new TimestampedCollector(output); - OutputCollector stormCollector = null; - - if (this.numberOfAttributes.size() > 0) { - stormCollector = new OutputCollector(new BoltCollector( - this.numberOfAttributes, flinkCollector)); - } + final OutputCollector stormCollector = new OutputCollector(new BoltCollector( + this.numberOfAttributes, flinkCollector)); GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); @@ -228,7 +253,7 @@ public class BoltWrapper extends AbstractStreamOperator implements } final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext( - getRuntimeContext(), this.bolt, this.stormTopology, stormConfig); + getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig); this.bolt.prepare(stormConfig, topologyContext, stormCollector); } diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java index 62b36beec22..f6b46463255 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -58,6 +58,8 @@ public final class SpoutWrapper extends RichParallelSourceFunction { private final HashMap numberOfAttributes; /** The wrapped {@link IRichSpout spout}. */ private final IRichSpout spout; + /** The name of the spout. */ + private final String name; /** The wrapper of the given Flink collector. */ private SpoutCollector collector; /** Indicates, if the source is still running or was canceled. */ @@ -193,7 +195,36 @@ public final class SpoutWrapper extends RichParallelSourceFunction { */ public SpoutWrapper(final IRichSpout spout, final Collection rawOutputs, final Integer numberOfInvocations) throws IllegalArgumentException { + this(spout, null, rawOutputs, numberOfInvocations); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} a finite number of times. The output type can be any type if parameter + * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is + * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared + * number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param name + * The name of the spout. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. (Can be {@code null}.) + * @param numberOfInvocations + * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} + * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is + * disabled. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final String name, final Collection rawOutputs, + final Integer numberOfInvocations) throws IllegalArgumentException { this.spout = spout; + this.name = name; this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs); this.numberOfInvocations = numberOfInvocations; } @@ -225,7 +256,7 @@ public final class SpoutWrapper extends RichParallelSourceFunction { } this.spout.open(stormConfig, WrapperSetupHelper.createTopologyContext( - (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, + (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name, this.stormTopology, stormConfig), new SpoutOutputCollector(this.collector)); this.spout.activate(); diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java index 5f1f142dfba..c4e46bfc1aa 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java @@ -112,12 +112,7 @@ class WrapperSetupHelper { @SuppressWarnings({ "rawtypes", "unchecked" }) static synchronized TopologyContext createTopologyContext( final StreamingRuntimeContext context, final IComponent spoutOrBolt, - StormTopology stormTopology, Map stormConfig) { - String operatorName = context.getTaskName(); - if (operatorName.startsWith("Source: ")) { - // prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here - operatorName = operatorName.substring(8); - } + final String operatorName, StormTopology stormTopology, final Map stormConfig) { final int dop = context.getNumberOfParallelSubtasks(); final Map taskToComponents = new HashMap(); diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index c1485c8fc0e..b06c2b1e200 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -54,7 +54,6 @@ import java.util.Map.Entry; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.same; import static org.mockito.Mockito.times; @@ -271,7 +270,7 @@ public class BoltWrapperTest extends AbstractTest { wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); wrapper.open(); - verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class)); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); } @SuppressWarnings("unchecked") diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index 20e480df582..b13045dee4d 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -228,8 +228,8 @@ public class WrapperSetupHelperTest extends AbstractTest { Config stormConfig = new Config(); stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test"); - TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext( - context, operators.get(thisComponentId), stormTopology, stormConfig); + TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context, + operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig); ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId); ComponentCommon common = topologyContext.getComponentCommon(thisComponentId); -- GitLab