提交 c09d14a9 编写于 作者: S Stephan Ewen

[FLINK-2636] [streaming] Create common type StreamElement for StreamRecord and Watermark

上级 655a891d
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import java.io.IOException;
......
......@@ -17,6 +17,8 @@
*/
package org.apache.flink.streaming.api.watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
/**
* A Watermark tells operators that receive it that no elements with a timestamp older or equal
* to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
......@@ -31,11 +33,11 @@ package org.apache.flink.streaming.api.watermark;
* In some cases a watermark is only a heuristic and operators should be able to deal with
* late elements. They can either discard those or update the result and emit updates/retractions
* to downstream operations.
*
*/
public class Watermark {
public class Watermark extends StreamElement {
private long timestamp;
/** The timestamp of the watermark */
private final long timestamp;
/**
* Creates a new watermark with the given timestamp.
......@@ -53,16 +55,8 @@ public class Watermark {
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Watermark watermark = (Watermark) o;
return timestamp == watermark.timestamp;
return this == o ||
o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
}
@Override
......@@ -72,8 +66,6 @@ public class Watermark {
@Override
public String toString() {
return "Watermark{" +
"timestamp=" + timestamp +
'}';
return "Watermark @ " + timestamp;
}
}
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -36,9 +37,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private StreamRecordWriter<SerializationDelegate<Object>> recordWriter;
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<Object> serializationDelegate;
private SerializationDelegate<StreamElement> serializationDelegate;
@SuppressWarnings("unchecked")
......@@ -51,19 +52,19 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
// generic hack: cast the writer to generic Object type so we can use it
// with multiplexed records and watermarks
this.recordWriter = (StreamRecordWriter<SerializationDelegate<Object>>)
this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
(StreamRecordWriter<?>) recordWriter;
TypeSerializer<Object> outRecordSerializer;
TypeSerializer<StreamElement> outRecordSerializer;
if (enableWatermarkMultiplexing) {
outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
} else {
outRecordSerializer = (TypeSerializer<Object>)
outRecordSerializer = (TypeSerializer<StreamElement>)
(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
}
if (outSerializer != null) {
serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
}
}
......
......@@ -36,6 +36,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -53,9 +54,9 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
*/
public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {
private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
......@@ -68,9 +69,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
private final long[] watermarks;
private long lastEmittedWatermark;
private final DeserializationDelegate<Object> deserializationDelegate;
private final DeserializationDelegate<StreamElement> deserializationDelegate;
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener,
CheckpointingMode checkpointMode,
......@@ -95,10 +96,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
} else {
StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
}
......@@ -106,7 +107,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
}
watermarks = new long[inputGate.getNumberOfInputChannels()];
......@@ -132,18 +133,15 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
}
if (result.isFullRecord()) {
Object recordOrWatermark = deserializationDelegate.getInstance();
StreamElement recordOrWatermark = deserializationDelegate.getInstance();
if (recordOrWatermark instanceof Watermark) {
Watermark mark = (Watermark) recordOrWatermark;
long watermarkMillis = mark.getTimestamp();
if (recordOrWatermark.isWatermark()) {
long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
if (watermarkMillis > watermarks[currentChannel]) {
watermarks[currentChannel] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE;
for (long watermark : watermarks) {
if (watermark < newMinWatermark) {
newMinWatermark = watermark;
}
newMinWatermark = Math.min(watermark, newMinWatermark);
}
if (newMinWatermark > lastEmittedWatermark) {
lastEmittedWatermark = newMinWatermark;
......@@ -154,8 +152,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
}
else {
// now we can do the actual processing
@SuppressWarnings("unchecked")
StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
StreamRecord<IN> record = recordOrWatermark.asRecord();
StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
if (ctx != null) {
ctx.setNextInput(record);
......
......@@ -35,6 +35,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -44,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
/**
......@@ -61,9 +63,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
......@@ -81,8 +83,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
private final int numInputChannels1;
private final DeserializationDelegate<Object> deserializationDelegate1;
private final DeserializationDelegate<Object> deserializationDelegate2;
private final DeserializationDelegate<StreamElement> deserializationDelegate1;
private final DeserializationDelegate<StreamElement> deserializationDelegate2;
@SuppressWarnings({"unchecked", "rawtypes"})
public StreamTwoInputProcessor(
......@@ -113,21 +115,21 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
}
else {
StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
this.deserializationDelegate1 = (DeserializationDelegate<Object>)
this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
}
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
}
else {
StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
this.deserializationDelegate2 = (DeserializationDelegate<Object>)
this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
}
......@@ -135,7 +137,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
}
// determine which unioned channels belong to input 1 and which belong to input 2
......@@ -148,15 +150,11 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
watermarks1 = new long[numInputChannels1];
for (int i = 0; i < numInputChannels1; i++) {
watermarks1[i] = Long.MIN_VALUE;
}
Arrays.fill(watermarks1, Long.MIN_VALUE);
lastEmittedWatermark1 = Long.MIN_VALUE;
watermarks2 = new long[numInputChannels2];
for (int i = 0; i < numInputChannels2; i++) {
watermarks2[i] = Long.MIN_VALUE;
}
Arrays.fill(watermarks2, Long.MIN_VALUE);
lastEmittedWatermark2 = Long.MIN_VALUE;
}
......@@ -182,22 +180,25 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
if (result.isFullRecord()) {
if (currentChannel < numInputChannels1) {
Object recordOrWatermark = deserializationDelegate1.getInstance();
if (recordOrWatermark instanceof Watermark) {
StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
if (recordOrWatermark.isWatermark()) {
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
continue;
} else {
streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance());
}
else {
streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
return true;
}
} else {
Object recordOrWatermark = deserializationDelegate2.getInstance();
if (recordOrWatermark instanceof Watermark) {
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
}
else {
StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
if (recordOrWatermark.isWatermark()) {
handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel);
continue;
} else {
streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance());
}
else {
streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
return true;
}
}
......@@ -234,10 +235,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
if (watermarkMillis > watermarks1[channelIndex]) {
watermarks1[channelIndex] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE;
for (long aWatermarks1 : watermarks1) {
if (aWatermarks1 < newMinWatermark) {
newMinWatermark = aWatermarks1;
}
for (long wm : watermarks1) {
newMinWatermark = Math.min(wm, newMinWatermark);
}
if (newMinWatermark > lastEmittedWatermark1) {
lastEmittedWatermark1 = newMinWatermark;
......@@ -250,10 +249,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
if (watermarkMillis > watermarks2[channelIndex]) {
watermarks2[channelIndex] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE;
for (long aWatermarks2 : watermarks2) {
if (aWatermarks2 < newMinWatermark) {
newMinWatermark = aWatermarks2;
}
for (long wm : watermarks2) {
newMinWatermark = Math.min(wm, newMinWatermark);
}
if (newMinWatermark > lastEmittedWatermark2) {
lastEmittedWatermark2 = newMinWatermark;
......
......@@ -27,22 +27,22 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import java.io.IOException;
/**
* Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal
* Serializer for {@link StreamRecord} and {@link org.apache.flink.streaming.api.watermark.Watermark}. This does not behave like a normal
* {@link TypeSerializer}, instead, this is only used at the
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
* {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer
* {@link StreamRecord StreamRecords} and {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. This serializer
* can handle both of them, therefore it returns {@link Object} the result has
* to be cast to the correct type.
*
* @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
*/
public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object> {
public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
private static final long serialVersionUID = 1L;
private static final long IS_WATERMARK = Long.MIN_VALUE;
protected final TypeSerializer<T> typeSerializer;
private final TypeSerializer<T> typeSerializer;
public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
......@@ -59,87 +59,94 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
}
@Override
public TypeSerializer<Object> duplicate() {
return this;
public TypeSerializer<StreamElement> duplicate() {
TypeSerializer<T> copy = typeSerializer.duplicate();
return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
}
@Override
public Object createInstance() {
public StreamRecord<T> createInstance() {
return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
}
@Override
@SuppressWarnings("unchecked")
public Object copy(Object from) {
public StreamElement copy(StreamElement from) {
// we can reuse the timestamp since Instant is immutable
if (from instanceof StreamRecord) {
StreamRecord<T> fromRecord = (StreamRecord<T>) from;
if (from.isRecord()) {
StreamRecord<T> fromRecord = from.asRecord();
return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
} else if (from instanceof Watermark) {
}
else if (from.isWatermark()) {
// is immutable
return from;
} else {
}
else {
throw new RuntimeException("Cannot copy " + from);
}
}
@Override
@SuppressWarnings("unchecked")
public Object copy(Object from, Object reuse) {
if (from instanceof StreamRecord && reuse instanceof StreamRecord) {
StreamRecord<T> fromRecord = (StreamRecord<T>) from;
StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
public StreamElement copy(StreamElement from, StreamElement reuse) {
if (from.isRecord() && reuse.isRecord()) {
StreamRecord<T> fromRecord = from.asRecord();
StreamRecord<T> reuseRecord = reuse.asRecord();
reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), fromRecord.getTimestamp());
T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
reuseRecord.replace(valueCopy, fromRecord.getTimestamp());
return reuse;
} else if (from instanceof Watermark) {
}
else if (from.isWatermark()) {
// is immutable
return from;
} else {
throw new RuntimeException("Cannot copy " + from);
}
else {
throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
}
}
@Override
public int getLength() {
return 0;
return -1;
}
@Override
@SuppressWarnings("unchecked")
public void serialize(Object value, DataOutputView target) throws IOException {
if (value instanceof StreamRecord) {
StreamRecord<T> record = (StreamRecord<T>) value;
public void serialize(StreamElement value, DataOutputView target) throws IOException {
if (value.isRecord()) {
StreamRecord<T> record = value.asRecord();
target.writeLong(record.getTimestamp());
typeSerializer.serialize(record.getValue(), target);
} else if (value instanceof Watermark) {
}
else if (value.isWatermark()) {
target.writeLong(IS_WATERMARK);
target.writeLong(((Watermark) value).getTimestamp());
target.writeLong(value.asWatermark().getTimestamp());
}
else {
throw new RuntimeException();
}
}
@Override
public Object deserialize(DataInputView source) throws IOException {
public StreamElement deserialize(DataInputView source) throws IOException {
long millis = source.readLong();
if (millis == IS_WATERMARK) {
return new Watermark(source.readLong());
} else {
}
else {
T element = typeSerializer.deserialize(source);
return new StreamRecord<T>(element, millis);
}
}
@Override
@SuppressWarnings("unchecked")
public Object deserialize(Object reuse, DataInputView source) throws IOException {
public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
long millis = source.readLong();
if (millis == IS_WATERMARK) {
return new Watermark(source.readLong());
} else {
StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
}
else {
StreamRecord<T> reuseRecord = reuse.asRecord();
T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
reuseRecord.replace(element, millis);
return reuse;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.streamrecord;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* An element in a data stream. Can be a record or a Watermark.
*/
public abstract class StreamElement {
/**
* Checks whether this element is a watermark.
* @return True, if this element is a watermark, false otherwise.
*/
public final boolean isWatermark() {
return getClass() == Watermark.class;
}
/**
* Checks whether this element is a record.
* @return True, if this element is a record, false otherwise.
*/
public final boolean isRecord() {
return getClass() == StreamRecord.class;
}
/**
* Casts this element into a StreamRecord.
* @return This element as a stream record.
* @throws java.lang.ClassCastException Thrown, if this element is actually not a stream record.
*/
@SuppressWarnings("unchecked")
public final <E> StreamRecord<E> asRecord() {
return (StreamRecord<E>) this;
}
/**
* Casts this element into a Watermark.
* @return This element as a Watermark.
* @throws java.lang.ClassCastException Thrown, if this element is actually not a Watermark.
*/
public final Watermark asWatermark() {
return (Watermark) this;
}
}
......@@ -22,10 +22,12 @@ package org.apache.flink.streaming.runtime.streamrecord;
*
* @param <T> The type encapsulated with the stream record.
*/
public class StreamRecord<T> {
public class StreamRecord<T> extends StreamElement {
/** The actual value held by this record */
private T value;
/** The timestamp of the record */
private long timestamp;
/**
......
......@@ -20,7 +20,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
......@@ -32,6 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -85,7 +85,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = new SerializationDelegate<Object>(new MultiplexingStreamRecordSerializer<T>(serializer));
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
......
......@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
......@@ -31,7 +30,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.HashSet;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -42,7 +41,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
/**
......
......@@ -23,19 +23,14 @@ import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/**
......
......@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
......@@ -79,7 +80,7 @@ public class StreamTaskTestHarness<OUT> {
private AbstractInvokable task;
private TypeSerializer<OUT> outputSerializer;
private TypeSerializer<Object> outputStreamRecordSerializer;
private TypeSerializer<StreamElement> outputStreamRecordSerializer;
private ConcurrentLinkedQueue<Object> outputList;
......@@ -119,8 +120,7 @@ public class StreamTaskTestHarness<OUT> {
/**
* This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
*/
protected void initializeInputs() throws IOException, InterruptedException {
}
protected void initializeInputs() throws IOException, InterruptedException {}
@SuppressWarnings("unchecked")
private void initializeOutput() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册