提交 e134d275 编写于 作者: C Chen Qin 提交者: Aljoscha Krettek

[FLINK-4460] Add support for side outputs

This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.
上级 f31a55e0
......@@ -46,7 +46,14 @@ public abstract class TypeHint<T> {
public TypeHint() {
this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0);
}
/**
* Creates a hint for the generic type in the class signature.
*/
public TypeHint(Class<?> baseClass, Object instance, int genericParameterPos) {
this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos);
}
// ------------------------------------------------------------------------
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* An {@link OutputTag} is a typed and named tag to use for tagging side outputs
* of an operator.
*
* <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive
* a {@link TypeInformation} for the generic type parameter.
*
* <p>Example:
* <pre>{@code
* OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
* }</pre>
*
* @param <T> the type of elements in the side-output stream.
*/
@PublicEvolving
public class OutputTag<T> implements Serializable {
private static final long serialVersionUID = 1L;
private final String id;
private transient TypeInformation<T> typeInfo;
/**
* Creates a new named {@code OutputTag} with the given id.
*
* @param id The id of the created {@code OutputTag}.
*/
public OutputTag(String id) {
this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
try {
TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};
this.typeInfo = typeHint.getTypeInfo();
} catch (InvalidTypesException e) {
throw new InvalidTypesException("Could not determine TypeInformation for generic " +
"OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e);
}
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
typeInfo = null;
}
public String getId() {
return id;
}
public TypeInformation<T> getTypeInfo() {
return typeInfo;
}
@Override
public boolean equals(Object obj) {
return obj instanceof OutputTag
&& ((OutputTag) obj).id.equals(this.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return "OutputTag(" + getTypeInfo() + ", " + id + ")";
}
}
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom;
......@@ -138,6 +139,11 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
}
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
}
@Override
public void close() {
for (Output<StreamRecord<OUT>> out : allOutputs) {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -27,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;
......@@ -416,4 +418,16 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
sideOutputTag = clean(sideOutputTag);
SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), requireNonNull(sideOutputTag));
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
}
......@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
@Internal
public class StreamConfig implements Serializable {
......@@ -63,6 +65,7 @@ public class StreamConfig implements Serializable {
private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
private static final String ITERATON_WAIT = "iterationWait";
private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
private static final String EDGES_IN_ORDER = "edgesInOrder";
......@@ -139,6 +142,10 @@ public class StreamConfig implements Serializable {
public void setTypeSerializerOut(TypeSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
}
public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
}
public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
try {
......@@ -155,7 +162,7 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
}
public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
......@@ -164,6 +171,15 @@ public class StreamConfig implements Serializable {
}
}
public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
try {
return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
}
private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
try {
InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
......
......@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.OutputTag;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
/**
......@@ -48,15 +49,25 @@ public class StreamEdge implements Serializable {
* output selection).
*/
private final List<String> selectedNames;
/**
* The side-output tag (if any) of this {@link StreamEdge}.
*/
private final OutputTag outputTag;
/**
* The {@link StreamPartitioner} on this {@link StreamEdge}.
*/
private StreamPartitioner<?> outputPartitioner;
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+ "_" + outputPartitioner;
......@@ -86,6 +97,8 @@ public class StreamEdge implements Serializable {
return selectedNames;
}
public OutputTag getOutputTag() {return this.outputTag;}
public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}
......@@ -117,6 +130,6 @@ public class StreamEdge implements Serializable {
public String toString() {
return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ ')';
+ ", outputTag=" + outputTag + ')';
}
}
......@@ -32,6 +32,7 @@ import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -85,6 +86,7 @@ public class StreamGraph extends StreamingPlan {
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;
protected Map<Integer, String> vertexIDtoBrokerID;
......@@ -110,6 +112,7 @@ public class StreamGraph extends StreamingPlan {
public void clear() {
streamNodes = new HashMap<>();
virtualSelectNodes = new HashMap<>();
virtualSideOutputNodes = new HashMap<>();
virtualPartitionNodes = new HashMap<>();
vertexIDtoBrokerID = new HashMap<>();
vertexIDtoLoopTimeout = new HashMap<>();
......@@ -293,6 +296,23 @@ public class StreamGraph extends StreamingPlan {
new Tuple2<Integer, List<String>>(originalId, selectedNames));
}
/**
* Adds a new virtual node that is used to connect a downstream vertex to only the outputs with
* the selected side-output {@link OutputTag}.
*
* @param originalId ID of the node that should be connected to.
* @param virtualId ID of the virtual node.
* @param outputTag The selected side-output {@code OutputTag}.
*/
public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, OutputTag outputTag) {
if (virtualSideOutputNodes.containsKey(virtualId)) {
throw new IllegalStateException("Already has virtual output node with id " + virtualId);
}
virtualSideOutputNodes.put(virtualId, new Tuple2<>(originalId, outputTag));
}
/**
* Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
* partitioning.
......@@ -318,7 +338,10 @@ public class StreamGraph extends StreamingPlan {
* Determines the slot sharing group of an operation across virtual nodes.
*/
public String getSlotSharingGroup(Integer id) {
if (virtualSelectNodes.containsKey(id)) {
if (virtualSideOutputNodes.containsKey(id)) {
Integer mappedId = virtualSideOutputNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtualSelectNodes.containsKey(id)) {
Integer mappedId = virtualSelectNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtualPartitionNodes.containsKey(id)) {
......@@ -335,7 +358,7 @@ public class StreamGraph extends StreamingPlan {
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>());
new ArrayList<String>(), null);
}
......@@ -343,24 +366,31 @@ public class StreamGraph extends StreamingPlan {
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames) {
List<String> outputNames,
OutputTag outputTag) {
if (virtualSelectNodes.containsKey(upStreamVertexID)) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
......@@ -382,7 +412,7 @@ public class StreamGraph extends StreamingPlan {
}
}
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SelectTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SplitTransformation;
......@@ -184,6 +185,8 @@ public class StreamGraphGenerator {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
......@@ -301,6 +304,35 @@ public class StreamGraphGenerator {
return virtualResultIds;
}
/**
* Transforms a {@code SideOutputTransformation}.
*
* <p>
* For this we create a virtual node in the {@code StreamGraph} that holds the side-output
* {@link org.apache.flink.util.OutputTag}.
*
* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
*/
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
StreamTransformation<?> input = sideOutput.getInput();
Collection<Integer> resultIds = transform(input);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(sideOutput)) {
return alreadyTransformed.get(sideOutput);
}
List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = StreamTransformation.getNewNodeId();
streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
}
/**
* Transforms a {@code FeedbackTransformation}.
*
......
......@@ -374,6 +374,25 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
config.setTypeSerializerOut(vertex.getTypeSerializerOut());
// iterate edges, find sideOutput edges create and save serializers for each outputTag type
for (StreamEdge edge : chainableOutputs) {
if (edge.getOutputTag() != null) {
config.setTypeSerializerSideOut(
edge.getOutputTag(),
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
);
}
}
for (StreamEdge edge : nonChainableOutputs) {
if (edge.getOutputTag() != null) {
config.setTypeSerializerSideOut(
edge.getOutputTag(),
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
);
}
}
config.setStreamOperator(vertex.getOperator());
config.setOutputSelectors(vertex.getOutputSelectors());
......@@ -469,6 +488,7 @@ public class StreamingJobGraphGenerator {
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& edge.getOutputTag() == null // disable chaining for side outputs
&& streamGraph.isChainingEnabled();
}
......
......@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -861,6 +862,12 @@ public abstract class AbstractStreamOperator<OUT>
output.collect(record);
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
@Override
public void close() {
output.close();
......
......@@ -20,7 +20,9 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
......@@ -41,5 +43,12 @@ public interface Output<T> extends Collector<T> {
*/
void emitWatermark(Watermark mark);
/**
* Emits a record the the side output identified by the given {@link OutputTag}.
*
* @param record The record to collect.
*/
<X> void collect(OutputTag<?> outputTag, StreamRecord<X> record);
void emitLatencyMarker(LatencyMarker latencyMarker);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.transformations;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.Lists;
import org.apache.flink.util.OutputTag;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import java.util.Collection;
import java.util.List;
/**
* This transformation represents a selection of a side output of an upstream operation with a
* given {@link OutputTag}.
*
* <p>This does not create a physical operation, it only affects how upstream operations are
* connected to downstream operations.
*
* @param <T> The type of the elements that result from this {@code SideOutputTransformation}
*/
public class SideOutputTransformation<T> extends StreamTransformation<T> {
private final StreamTransformation<?> input;
private final OutputTag<T> tag;
public SideOutputTransformation(StreamTransformation<?> input, final OutputTag<T> tag) {
super("SideOutput", tag.getTypeInfo(), requireNonNull(input).getParallelism());
this.input = input;
this.tag = requireNonNull(tag);
}
/**
* Returns the input {@code StreamTransformation}.
*/
public StreamTransformation<?> getInput() {
return input;
}
public OutputTag<T> getOutputTag() {
return tag;
}
@Override
public Collection<StreamTransformation<?>> getTransitivePredecessors() {
List<StreamTransformation<?>> result = Lists.newArrayList();
result.add(this);
result.addAll(input.getTransitivePredecessors());
return result;
}
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
throw new UnsupportedOperationException("Cannot set chaining strategy on SideOutput Transformation.");
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
......@@ -28,9 +29,9 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -46,15 +47,18 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private SerializationDelegate<StreamElement> serializationDelegate;
private final StreamStatusProvider streamStatusProvider;
private final OutputTag outputTag;
@SuppressWarnings("unchecked")
public RecordWriterOutput(
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
checkNotNull(recordWriter);
this.outputTag = outputTag;
// generic hack: cast the writer to generic Object type so we can use it
// with multiplexed records and watermarks
this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
......@@ -72,6 +76,26 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToRecordWriter(record);
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToRecordWriter(record);
}
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
......@@ -350,9 +351,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
private <T> RecordWriterOutput<T> createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
Environment taskEnvironment,
String taskName)
{
TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
String taskName) {
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
if (edge.getOutputTag() != null) {
// side output
outSerializer =
upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(),
taskEnvironment.getUserClassLoader());
} else {
// main output
outSerializer =
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
@SuppressWarnings("unchecked")
StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
......@@ -369,11 +383,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
return new RecordWriterOutput<>(output, outSerializer, this);
return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}
// ------------------------------------------------------------------------
......@@ -405,6 +419,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
// ignore
}
@Override
public void emitWatermark(Watermark mark) {
try {
......@@ -457,8 +476,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
}
catch (Exception e) {
} catch (Exception e) {
throw new RuntimeException("Could not forward element to next operator", e);
}
}
......@@ -471,7 +489,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
private final Random RNG = new XORShiftRandom();
private final StreamStatusProvider streamStatusProvider;
public BroadcastingOutputCollector(
Output<StreamRecord<T>>[] outputs,
StreamStatusProvider streamStatusProvider) {
......@@ -507,6 +525,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
@Override
public <X> void collect(
OutputTag<?> outputTag, StreamRecord<X> record) {
for (Output<StreamRecord<T>> output : outputs) {
output.collect(outputTag, record);
}
}
@Override
public void close() {
for (Output<StreamRecord<T>> output : outputs) {
......
......@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -121,6 +122,12 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
}
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side outputs not used in iteration tail");
}
@Override
public void close() {
}
......
......@@ -270,6 +270,7 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 1, null, null, null, null, null),
0,
Collections.<String>emptyList(),
null,
null
)));
......@@ -281,6 +282,7 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 2, null, null, null, null, null),
0,
Collections.<String>emptyList(),
null,
null
)));
......@@ -290,7 +292,8 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 3, null, null, null, null, null),
0,
Collections.<String>emptyList(),
new BroadcastPartitioner<Object>()));
new BroadcastPartitioner<Object>(),
null));
tailOperatorConfig.setStreamOperator(tailOperator);
tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
......@@ -641,6 +644,7 @@ public class OneInputStreamTaskTest extends TestLogger {
),
0,
Collections.<String>emptyList(),
null,
null
);
......
......@@ -153,7 +153,8 @@ public class StreamTaskTestHarness<OUT> {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
......
......@@ -128,7 +128,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
targetVertexDummy,
1,
new LinkedList<String>(),
new BroadcastPartitioner<Object>());
new BroadcastPartitioner<Object>(),
null /* output tag */);
inPhysicalEdges.add(streamEdge);
break;
......@@ -143,7 +144,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
targetVertexDummy,
2,
new LinkedList<String>(),
new BroadcastPartitioner<Object>());
new BroadcastPartitioner<Object>(),
null /* output tag */);
inPhysicalEdges.add(streamEdge);
break;
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
......@@ -70,8 +71,10 @@ import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.mockito.Matchers.any;
......@@ -88,6 +91,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final protected ConcurrentLinkedQueue<Object> outputList;
final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
final protected StreamConfig config;
final protected ExecutionConfig executionConfig;
......@@ -147,6 +152,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final Environment environment) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
this.sideOutputLists = new HashMap<>();
Configuration underlyingConfig = environment.getTaskConfiguration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
......@@ -263,6 +270,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
return outputList;
}
public ConcurrentLinkedQueue<Object> getSideOutput(OutputTag tag) {
return sideOutputLists.get(tag);
}
/**
* Get only the {@link StreamRecord StreamRecords} emitted by the operator.
*/
......@@ -610,6 +621,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
private TypeSerializer<OUT> outputSerializer;
private TypeSerializer sideOutputSerializer;
MockOutput() {
this(null);
}
......@@ -634,12 +647,29 @@ public class AbstractStreamOperatorTestHarness<OUT> {
outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
}
if (element.hasTimestamp()) {
outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()), element.getTimestamp()));
} else {
outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue())));
}
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag);
if (sideOutputList == null) {
sideOutputList = new ConcurrentLinkedQueue<>();
sideOutputLists.put(outputTag, sideOutputList);
}
if (record.hasTimestamp()) {
sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
} else {
sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue())));
}
}
@Override
public void close() {
// ignore
......
......@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import java.io.Serializable;
import java.util.List;
......@@ -52,6 +53,11 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> {
list.add(record.copy(copied));
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
}
@Override
public void close() {}
}
......@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
public class MockOutput<T> implements Output<StreamRecord<T>> {
private Collection<T> outputs;
......@@ -39,6 +40,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>> {
outputs.add(copied);
}
@Override
public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side output not supported for MockOutput");
}
@Override
public void emitWatermark(Watermark mark) {
throw new RuntimeException("THIS MUST BE IMPLEMENTED");
......
......@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
import org.apache.flink.util.{Collector, OutputTag}
import scala.collection.JavaConverters._
......@@ -239,6 +239,12 @@ class DataStream[T](stream: JavaStream[T]) {
this
}
@PublicEvolving
def getSideOutput[X: OutputTag](tag: OutputTag[X]): DataStream[X] = javaStream match {
case stream : SingleOutputStreamOperator[X] =>
asScalaStream(stream.getSideOutput(tag: OutputTag[X]))
}
/**
* Sets an user provided hash for this operator. This will be used AS IS the create
* the JobVertexID.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册