提交 4c46b2fe 编写于 作者: M mjsax 提交者: mjsax

[FLINK-2861] Fields grouping on split streams fails

This closes #1387
上级 85e7b287
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.TaskIdBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.StormTestBase;
public class StormFieldsGroupingITCase extends StormTestBase {
private final static String topologyId = "FieldsGrouping Test";
private final static String spoutId = "spout";
private final static String boltId = "bolt";
private final static String sinkId = "sink";
private String resultPath;
@Override
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n"
+ "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n"
+ "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath);
}
@Override
protected void testProgram() throws Exception {
final String[] tokens = this.resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number"));
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
cluster.killTopology(topologyId);
cluster.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests.operators;
import java.util.Map;
import java.util.Random;
import org.apache.flink.storm.util.FiniteSpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class FiniteRandomSpout extends BaseRichSpout implements FiniteSpout {
private static final long serialVersionUID = 6592885571932363239L;
public static final String STREAM_PREFIX = "stream_";
private final Random r;
private SpoutOutputCollector collector;
private int counter;
private final String[] outputStreams;
public FiniteRandomSpout(long seed, int counter, int numberOfOutputStreams) {
this.r = new Random(seed);
this.counter = counter;
if (numberOfOutputStreams < 1) {
this.outputStreams = new String[] { Utils.DEFAULT_STREAM_ID };
} else {
this.outputStreams = new String[numberOfOutputStreams];
for (int i = 0; i < this.outputStreams.length; ++i) {
this.outputStreams[i] = STREAM_PREFIX + i;
}
}
}
public FiniteRandomSpout(long seed, int counter) {
this(seed, counter, 1);
}
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
for (String s : this.outputStreams) {
this.collector.emit(s, new Values(this.r.nextInt()));
}
--this.counter;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (String s : this.outputStreams) {
declarer.declareStream(s, new Fields("number"));
}
}
@Override
public boolean reachedEnd() {
return this.counter <= 0;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests.operators;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class TaskIdBolt extends BaseRichBolt {
private static final long serialVersionUID = -7966475984592762720L;
private OutputCollector collector;
private int thisTaskId;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.thisTaskId = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
this.collector.emit(new Values(this.thisTaskId + "> " + input.getValue(0)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
}
}
......@@ -101,7 +101,7 @@ public class FlinkTopologyBuilder {
final DataStreamSource<?> source;
if (sourceStreams.size() == 1) {
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
spoutWrapperSingleOutput.setStormTopology(stormTopology);
final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
......@@ -113,7 +113,7 @@ public class FlinkTopologyBuilder {
source = src;
} else {
final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
userSpout);
userSpout, spoutId, null, null);
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
@SuppressWarnings({ "unchecked", "rawtypes" })
......@@ -124,7 +124,10 @@ public class FlinkTopologyBuilder {
SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
.split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId)
.map(new SplitStreamMapper<Tuple>());
outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
outputStreams.put(streamId, outStream);
}
source = multiSource;
}
......@@ -230,8 +233,8 @@ public class FlinkTopologyBuilder {
.getOutputType(outputStreamId);
final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>(
userBolt, this.outputStreams.get(producerId).get(
inputStreamId));
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
boltWrapperSingleOutput.setStormTopology(stormTopology);
final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
......@@ -246,8 +249,8 @@ public class FlinkTopologyBuilder {
outputStream = outStream;
} else {
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>(
userBolt, this.outputStreams.get(producerId).get(
inputStreamId));
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
boltWrapperMultipleOutputs.setStormTopology(stormTopology);
@SuppressWarnings({ "unchecked", "rawtypes" })
......@@ -262,9 +265,12 @@ public class FlinkTopologyBuilder {
final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
for (String outputStreamId : boltOutputStreams.keySet()) {
op.put(outputStreamId,
splitStream.select(outputStreamId).map(
new SplitStreamMapper<Tuple>()));
SingleOutputStreamOperator<Tuple, ?> outStream = splitStream
.select(outputStreamId).map(
new SplitStreamMapper<Tuple>());
outStream.getTransformation().setOutputType(
declarer.getOutputType(outputStreamId));
op.put(outputStreamId, outStream);
}
availableInputs.put(boltId, op);
outputStream = multiStream;
......
......@@ -54,6 +54,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
/** The wrapped Storm {@link IRichBolt bolt}. */
private final IRichBolt bolt;
/** The name of the bolt. */
private final String name;
/** Number of attributes of the bolt's output tuples per stream. */
private final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
......@@ -189,7 +191,34 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
*/
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, null, inputSchema, rawOutputs);
}
/**
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param name
* The name of the bolt.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
*/
public BoltWrapper(final IRichBolt bolt, final String name, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this.bolt = bolt;
this.name = name;
this.inputSchema = inputSchema;
this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
}
......@@ -209,12 +238,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
super.open();
this.flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;
if (this.numberOfAttributes.size() > 0) {
stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
}
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
......@@ -228,7 +253,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
}
final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
getRuntimeContext(), this.bolt, this.stormTopology, stormConfig);
getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
this.bolt.prepare(stormConfig, topologyContext, stormCollector);
}
......
......@@ -58,6 +58,8 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
private final HashMap<String, Integer> numberOfAttributes;
/** The wrapped {@link IRichSpout spout}. */
private final IRichSpout spout;
/** The name of the spout. */
private final String name;
/** The wrapper of the given Flink collector. */
private SpoutCollector<OUT> collector;
/** Indicates, if the source is still running or was canceled. */
......@@ -193,7 +195,36 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
*/
public SpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs,
final Integer numberOfInvocations) throws IllegalArgumentException {
this(spout, null, rawOutputs, numberOfInvocations);
}
/**
* Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
* the given {@link IRichSpout spout} a finite number of times. The output type can be any type if parameter
* {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
* {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
* number of attributes.
*
* @param spout
* The {@link IRichSpout spout} to be used.
* @param name
* The name of the spout.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type. (Can be {@code null}.)
* @param numberOfInvocations
* The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper}
* terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is
* disabled.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
*/
public SpoutWrapper(final IRichSpout spout, final String name, final Collection<String> rawOutputs,
final Integer numberOfInvocations) throws IllegalArgumentException {
this.spout = spout;
this.name = name;
this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
this.numberOfInvocations = numberOfInvocations;
}
......@@ -225,7 +256,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
}
this.spout.open(stormConfig, WrapperSetupHelper.createTopologyContext(
(StreamingRuntimeContext) super.getRuntimeContext(), this.spout,
(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
this.stormTopology, stormConfig), new SpoutOutputCollector(this.collector));
this.spout.activate();
......
......@@ -112,12 +112,7 @@ class WrapperSetupHelper {
@SuppressWarnings({ "rawtypes", "unchecked" })
static synchronized TopologyContext createTopologyContext(
final StreamingRuntimeContext context, final IComponent spoutOrBolt,
StormTopology stormTopology, Map stormConfig) {
String operatorName = context.getTaskName();
if (operatorName.startsWith("Source: ")) {
// prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here
operatorName = operatorName.substring(8);
}
final String operatorName, StormTopology stormTopology, final Map stormConfig) {
final int dop = context.getNumberOfParallelSubtasks();
final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
......
......@@ -54,7 +54,6 @@ import java.util.Map.Entry;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
......@@ -271,7 +270,7 @@ public class BoltWrapperTest extends AbstractTest {
wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.open();
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
}
@SuppressWarnings("unchecked")
......
......@@ -228,8 +228,8 @@ public class WrapperSetupHelperTest extends AbstractTest {
Config stormConfig = new Config();
stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
context, operators.get(thisComponentId), stormTopology, stormConfig);
TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册