提交 28c6254e 编写于 作者: S Stephan Ewen

[hotfix] Cleanup routing of records in OperatorChain

上级 e9c83ea2
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.collector.selector;
import java.util.ArrayList;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
private static final long serialVersionUID = 1L;
private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
public BroadcastOutputSelectorWrapper() {
outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
}
@Override
public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
outputs.add(output);
}
@Override
public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
return outputs;
}
}
......@@ -18,80 +18,113 @@
package org.apache.flink.streaming.api.collector.selector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
private List<OutputSelector<OUT>> outputSelectors;
private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
private final OutputSelector<OUT>[] outputSelectors;
public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
this.outputSelectors = outputSelectors;
this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
}
private final Output<StreamRecord<OUT>>[] selectAllOutputs;
@Override
public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
List<String> selectedNames = edge.getSelectedNames();
private final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap;
private final Output<StreamRecord<OUT>>[] allOutputs;
if (selectedNames.isEmpty()) {
selectAllOutputs.add(output);
@SuppressWarnings({"unchecked", "rawtypes"})
public DirectedOutput(
List<OutputSelector<OUT>> outputSelectors,
List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs)
{
this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);
this.allOutputs = new Output[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
allOutputs[i] = outputs.get(i).f0;
}
else {
for (String selectedName : selectedNames) {
if (!outputMap.containsKey(selectedName)) {
outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
outputMap.get(selectedName).add(output);
}
else {
if (!outputMap.get(selectedName).contains(output)) {
HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>();
HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) {
final Output<StreamRecord<OUT>> output = outputPair.f0;
final StreamEdge edge = outputPair.f1;
List<String> selectedNames = edge.getSelectedNames();
if (selectedNames.isEmpty()) {
selectAllOutputs.add(output);
}
else {
for (String selectedName : selectedNames) {
if (!outputMap.containsKey(selectedName)) {
outputMap.put(selectedName, new ArrayList<Output<StreamRecord<OUT>>>());
outputMap.get(selectedName).add(output);
}
else {
if (!outputMap.get(selectedName).contains(output)) {
outputMap.get(selectedName).add(output);
}
}
}
}
}
this.selectAllOutputs = selectAllOutputs.toArray(new Output[selectAllOutputs.size()]);
this.outputMap = new HashMap<>();
for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> entry : outputMap.entrySet()) {
Output<StreamRecord<OUT>>[] arr = entry.getValue().toArray(new Output[entry.getValue().size()]);
this.outputMap.put(entry.getKey(), arr);
}
}
@Override
public void emitWatermark(Watermark mark) {
for (Output<StreamRecord<OUT>> out : allOutputs) {
out.emitWatermark(mark);
}
}
@Override
public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
public void collect(StreamRecord<OUT> record) {
Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<Output<StreamRecord<OUT>>>(selectAllOutputs.length);
Collections.addAll(selectedOutputs, selectAllOutputs);
for (OutputSelector<OUT> outputSelector : outputSelectors) {
Iterable<String> outputNames = outputSelector.select(record);
Iterable<String> outputNames = outputSelector.select(record.getValue());
for (String outputName : outputNames) {
List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
try {
selectedOutputs.addAll(outputList);
} catch (NullPointerException e) {
if (LOG.isErrorEnabled()) {
String format = String.format(
"Cannot emit because no output is selected with the name: %s",
outputName);
LOG.error(format);
}
Output<StreamRecord<OUT>>[] outputList = outputMap.get(outputName);
if (outputList != null) {
Collections.addAll(selectedOutputs, outputList);
}
}
}
for (Output<StreamRecord<OUT>> out : selectedOutputs) {
out.collect(record);
}
}
return selectedOutputs;
@Override
public void close() {
for (Output<StreamRecord<OUT>> out : allOutputs) {
out.close();
}
}
}
......@@ -19,14 +19,7 @@ package org.apache.flink.streaming.api.collector.selector;
import java.io.Serializable;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
public interface OutputSelectorWrapper<OUT> extends Serializable {
public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
void sendOutputs(OUT record);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.collector.selector;
import java.util.List;
public class OutputSelectorWrapperFactory {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
if (outputSelectors.size() == 0) {
return new BroadcastOutputSelectorWrapper();
} else {
return new DirectedOutputSelectorWrapper(outputSelectors);
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -29,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
......@@ -38,7 +39,7 @@ import org.apache.flink.util.InstantiationUtil;
public class StreamConfig implements Serializable {
private static final long serialVersionUID = 1L;
// ------------------------------------------------------------------------
// Config Keys
// ------------------------------------------------------------------------
......@@ -191,19 +192,22 @@ public class StreamConfig implements Serializable {
}
}
public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) {
try {
InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER);
InstantiationUtil.writeObjectToConfig(outputSelectors, this.config, OUTPUT_SELECTOR_WRAPPER);
} catch (IOException e) {
throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
throw new StreamTaskException("Could not serialize output selectors", e);
}
}
public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader userCodeClassloader) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
List<OutputSelector<T>> selectors =
InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, userCodeClassloader);
return selectors == null ? Collections.<OutputSelector<T>>emptyList() : selectors;
} catch (Exception e) {
throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e);
throw new StreamTaskException("Could not read output selectors", e);
}
}
......
......@@ -26,15 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
/**
* Class representing the operators in the streaming programs, with all their
* properties.
*
* Class representing the operators in the streaming programs, with all their properties.
*/
public class StreamNode implements Serializable {
......@@ -168,10 +164,6 @@ public class StreamNode implements Serializable {
return outputSelectors;
}
public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
return OutputSelectorWrapperFactory.create(getOutputSelectors());
}
public void addOutputSelector(OutputSelector<?> outputSelector) {
this.outputSelectors.add(outputSelector);
}
......
......@@ -310,7 +310,7 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerOut(vertex.getTypeSerializerOut());
config.setStreamOperator(vertex.getOperator());
config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
config.setOutputSelectors(vertex.getOutputSelectors());
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
private OutputSelectorWrapper<OUT> outputSelectorWrapper;
private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
this.outputSelectorWrapper = outputSelectorWrapper;
allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
}
public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge) {
outputSelectorWrapper.addCollector(output, edge);
allOutputs.add(output);
}
@Override
public void collect(StreamRecord<OUT> record) {
for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
output.collect(record);
}
}
@Override
public void emitWatermark(Watermark mark) {
for (Output<?> output : allOutputs) {
output.emitWatermark(mark);
}
}
@Override
public void close() {}
}
......@@ -24,15 +24,16 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.CollectorWrapper;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
......@@ -45,6 +46,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@code OperatorChain} contains all operators that are executed as one chain within a single
* {@link StreamTask}.
*
* @param <OUT> The type of elements accepted by the chain, i.e., the input type of the chain's
* head operator.
*/
public class OperatorChain<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
......@@ -182,15 +190,14 @@ public class OperatorChain<OUT> {
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators)
{
// We create a wrapper that will encapsulate the chained operators and network outputs
OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
wrapper.addCollector(output, outputEdge);
allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output, outputEdge));
}
// Create collectors for the chained outputs
......@@ -200,9 +207,37 @@ public class OperatorChain<OUT> {
Output<StreamRecord<T>> output = createChainedOperator(
containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
wrapper.addCollector(output, outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// if there are multiple outputs, or the outputs are directed, we need to
// wrap them as one output
List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
if (selectors == null || selectors.isEmpty()) {
// simple path, no selector necessary
if (allOutputs.size() == 1) {
return allOutputs.get(0).f0;
}
else {
// send to N outputs. Note that this includes teh special case
// of sending to zero outputs
@SuppressWarnings({"unchecked", "rawtypes"})
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
asArray[i] = allOutputs.get(i).f0;
}
return new BroadcastingOutputCollector<T>(asArray);
}
}
else {
// selector present, more complex routing necessary
return new DirectedOutput<T>(selectors, allOutputs);
}
return wrapper;
}
private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
......@@ -309,7 +344,6 @@ public class OperatorChain<OUT> {
@Override
public void collect(StreamRecord<T> record) {
try {
StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
operator.setKeyContextElement1(copy);
......@@ -320,4 +354,34 @@ public class OperatorChain<OUT> {
}
}
}
private static final class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> {
private final Output<StreamRecord<T>>[] outputs;
public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
this.outputs = outputs;
}
@Override
public void emitWatermark(Watermark mark) {
for (Output<StreamRecord<T>> output : outputs) {
output.emitWatermark(mark);
}
}
@Override
public void collect(StreamRecord<T> record) {
for (Output<StreamRecord<T>> output : outputs) {
output.collect(record);
}
}
@Override
public void close() {
for (Output<StreamRecord<T>> output : outputs) {
output.close();
}
}
}
}
......@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
......@@ -26,7 +27,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleIn
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
......@@ -40,11 +40,11 @@ import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Test harness for testing a {@link StreamTask}.
*
......@@ -91,6 +91,7 @@ public class StreamTaskTestHarness<OUT> {
// input related methods only need to be implemented once, in generic form
protected int numInputGates;
protected int numInputChannelsPerGate;
@SuppressWarnings("rawtypes")
protected StreamTestSingleInputGate[] inputGates;
......@@ -128,7 +129,7 @@ public class StreamTaskTestHarness<OUT> {
mockEnv.addOutput(outputList, outputStreamRecordSerializer);
streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setNumberOfOutputs(1);
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册