未验证 提交 3b309d3e 编写于 作者: S Stephan Ewen 提交者: Aljoscha Krettek

[FLINK-17661] Deprecate old Watermark Assigner API

We add adapters for the old AssignerWithPeriodicWatermarks and
AssignerWithPunctuatedWatermarks and use the newly introduced operator.
上级 62911db7
......@@ -60,7 +60,6 @@ import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ProcessOperator;
......@@ -86,9 +85,9 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
import org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
......@@ -915,90 +914,42 @@ public class DataStream<T> {
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
* <p>This method creates watermarks periodically (for example every second), based
* on the watermarks indicated by the given watermark generator. Even when no new elements
* in the stream arrive, the given watermark generator will be periodically checked for
* new watermarks. The interval in which watermarks are generated is defined in
* {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
* <p>This method uses the deprecated watermark generator interfaces. Please switch to
* {@link #assignTimestampsAndWatermarks(TimestampAssigner, WatermarkStrategy)} to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* <p>Use this method for the common cases, where some characteristic over all elements
* should generate the watermarks, or where watermarks are simply trailing behind the
* wall clock time by a certain amount.
*
* <p>For the second case and when the watermarks are required to lag behind the maximum
* timestamp seen so far in the elements of the stream by a fixed amount of time, and this
* amount is known in advance, use the
* {@link BoundedOutOfOrdernessTimestampExtractor}.
*
* <p>For cases where watermarks should be created in an irregular fashion, for example
* based on certain markers that some element carry, use the
* {@link AssignerWithPunctuatedWatermarks}.
*
* @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
* watermark generator.
* @return The stream after the transformation, with assigned timestamps and watermarks.
*
* @see AssignerWithPeriodicWatermarks
* @see AssignerWithPunctuatedWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
* @deprecated Please use {@link #assignTimestampsAndWatermarks(TimestampAssigner, WatermarkStrategy)} instead.
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
final WatermarkStrategy<T> wms = new AssignerWithPeriodicWatermarksAdapter.Strategy<>(cleanedAssigner);
TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
return assignTimestampsAndWatermarks(cleanedAssigner, wms);
}
/**
* Assigns timestamps to the elements in the data stream and creates watermarks to
* signal event time progress based on the elements themselves.
*
* <p>This method creates watermarks based purely on stream elements. For each element
* that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
* the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
* method is called, and a new watermark is emitted, if the returned watermark value is
* non-negative and greater than the previous watermark.
*
* <p>This method is useful when the data stream embeds watermark elements, or certain elements
* carry a marker that can be used to determine the current event time watermark.
* This operation gives the programmer full control over the watermark generation. Users
* should be aware that too aggressive watermark generation (i.e., generating hundreds of
* watermarks every second) can cost some performance.
*
* <p>For cases where watermarks should be created in a regular fashion, for example
* every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}.
*
* @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
* watermark generator.
* @return The stream after the transformation, with assigned timestamps and watermarks.
* Assigns timestamps to the elements in the data stream and creates watermarks based on events,
* to signal event time progress.
*
* @see AssignerWithPunctuatedWatermarks
* @see AssignerWithPeriodicWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
* <p>This method uses the deprecated watermark generator interfaces. Please switch to
* {@link #assignTimestampsAndWatermarks(TimestampAssigner, WatermarkStrategy)} to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(TimestampAssigner, WatermarkStrategy)} instead.
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
final WatermarkStrategy<T> wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(cleanedAssigner);
TimestampsAndPunctuatedWatermarksOperator<T> operator =
new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
return assignTimestampsAndWatermarks(cleanedAssigner, wms);
}
// ------------------------------------------------------------------------
......
......@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
@Deprecated
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
/**
......
......@@ -64,6 +64,7 @@ import javax.annotation.Nullable;
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
@Deprecated
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
/**
......
......@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
*
* @param <T> The elements that get timestamps assigned.
*/
@Deprecated
public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = -4072216356049069301L;
......
......@@ -34,6 +34,7 @@ import static java.util.Objects.requireNonNull;
*
* @param <T> The type of the elements that this function can extract timestamps from
*/
@Deprecated
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
/**
* A stream operator that extracts timestamps from stream elements and
* generates periodic watermarks.
*
* @param <T> The type of the input elements
*/
public class TimestampsAndPeriodicWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
private transient long watermarkInterval;
private transient long currentWatermark;
public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
super(assigner);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
* watermarks from here).
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}
@Override
public void close() throws Exception {
super.close();
// emit a final watermark
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* A stream operator that extracts timestamps from stream elements and
* generates watermarks based on punctuation elements.
*
* @param <T> The type of the input elements
*/
public class TimestampsAndPunctuatedWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPunctuatedWatermarks<T>>
implements OneInputStreamOperator<T, T> {
private static final long serialVersionUID = 1L;
private long currentWatermark = Long.MIN_VALUE;
public TimestampsAndPunctuatedWatermarksOperator(AssignerWithPunctuatedWatermarks<T> assigner) {
super(assigner);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final T value = element.getValue();
final long newTimestamp = userFunction.extractTimestamp(value,
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
currentWatermark = nextWatermark.getTimestamp();
output.emitWatermark(nextWatermark);
}
}
/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit
* watermarks from here).
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An adapter that wraps a {@link AssignerWithPeriodicWatermarks} into a
* {@link WatermarkGenerator}.
*/
@Internal
@SuppressWarnings("deprecation")
public final class AssignerWithPeriodicWatermarksAdapter<T> implements WatermarkGenerator<T> {
private final AssignerWithPeriodicWatermarks<T> wms;
public AssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms) {
this.wms = checkNotNull(wms);
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
if (next != null) {
output.emitWatermark(new Watermark(next.getTimestamp()));
}
}
// ------------------------------------------------------------------------
/**
* A WatermarkStrategy that returns an {@link AssignerWithPeriodicWatermarks} wrapped as a
* {@link WatermarkGenerator}.
*/
public static final class Strategy<T> implements WatermarkStrategy<T> {
private static final long serialVersionUID = 1L;
private final AssignerWithPeriodicWatermarks<T> wms;
public Strategy(AssignerWithPeriodicWatermarks<T> wms) {
this.wms = checkNotNull(wms);
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator() {
return new AssignerWithPeriodicWatermarksAdapter<>(wms);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An adapter that wraps a {@link AssignerWithPunctuatedWatermarks} into a
* {@link WatermarkGenerator}.
*/
@Internal
@SuppressWarnings("deprecation")
public final class AssignerWithPunctuatedWatermarksAdapter<T> implements WatermarkGenerator<T> {
private final AssignerWithPunctuatedWatermarks<T> wms;
public AssignerWithPunctuatedWatermarksAdapter(AssignerWithPunctuatedWatermarks<T> wms) {
this.wms = checkNotNull(wms);
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
final org.apache.flink.streaming.api.watermark.Watermark next =
wms.checkAndGetNextWatermark(event, eventTimestamp);
if (next != null) {
output.emitWatermark(new Watermark(next.getTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {}
// ------------------------------------------------------------------------
/**
* A WatermarkStrategy that returns an {@link AssignerWithPunctuatedWatermarks} wrapped as a
* {@link WatermarkGenerator}.
*/
public static final class Strategy<T> implements WatermarkStrategy<T> {
private static final long serialVersionUID = 1L;
private final AssignerWithPunctuatedWatermarks<T> wms;
public Strategy(AssignerWithPunctuatedWatermarks<T> wms) {
this.wms = checkNotNull(wms);
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator() {
return new AssignerWithPunctuatedWatermarksAdapter<>(wms);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link TimestampsAndPeriodicWatermarksOperator}.
*/
public class TimestampsAndPeriodicWatermarksOperatorTest {
@Test
public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
final TimestampsAndPeriodicWatermarksOperator<Long> operator =
new TimestampsAndPeriodicWatermarksOperator<Long>(new LongExtractor());
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
long currentTime = 0;
testHarness.open();
testHarness.processElement(new StreamRecord<>(1L, 1));
testHarness.processElement(new StreamRecord<>(2L, 1));
testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
testHarness.processElement(new StreamRecord<>(3L, 3));
testHarness.processElement(new StreamRecord<>(4L, 3));
// validate first part of the sequence. we poll elements until our
// watermark updates to "3", which must be the result of the "4" element.
{
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
long nextElementValue = 1L;
long lastWatermark = -1L;
while (lastWatermark < 3) {
if (output.size() > 0) {
Object next = output.poll();
assertNotNull(next);
Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
nextElementValue = update.f0;
lastWatermark = update.f1;
// check the invariant
assertTrue(lastWatermark < nextElementValue);
} else {
currentTime = currentTime + 10;
testHarness.setProcessingTime(currentTime);
}
}
output.clear();
}
testHarness.processElement(new StreamRecord<>(4L, 4));
testHarness.processElement(new StreamRecord<>(5L, 4));
testHarness.processElement(new StreamRecord<>(6L, 4));
testHarness.processElement(new StreamRecord<>(7L, 4));
testHarness.processElement(new StreamRecord<>(8L, 4));
// validate the next part of the sequence. we poll elements until our
// watermark updates to "7", which must be the result of the "8" element.
{
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
long nextElementValue = 4L;
long lastWatermark = 2L;
while (lastWatermark < 7) {
if (output.size() > 0) {
Object next = output.poll();
assertNotNull(next);
Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
nextElementValue = update.f0;
lastWatermark = update.f1;
// check the invariant
assertTrue(lastWatermark < nextElementValue);
} else {
currentTime = currentTime + 10;
testHarness.setProcessingTime(currentTime);
}
}
output.clear();
}
testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
}
@Test
public void testNegativeTimestamps() throws Exception {
final AssignerWithPeriodicWatermarks<Long> assigner = new NeverWatermarkExtractor();
final TimestampsAndPeriodicWatermarksOperator<Long> operator =
new TimestampsAndPeriodicWatermarksOperator<Long>(assigner);
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
new OneInputStreamOperatorTestHarness<Long, Long>(operator);
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
testHarness.open();
long[] values = { Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE };
for (long value : values) {
testHarness.processElement(new StreamRecord<>(value));
}
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
for (long value: values) {
assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
}
}
// ------------------------------------------------------------------------
private Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
if (element instanceof StreamRecord) {
@SuppressWarnings("unchecked")
StreamRecord<Long> record = (StreamRecord<Long>) element;
assertEquals(nextElementValue, record.getValue().longValue());
assertEquals(nextElementValue, record.getTimestamp());
return new Tuple2<>(nextElementValue + 1, currentWatermark);
}
else if (element instanceof Watermark) {
long wt = ((Watermark) element).getTimestamp();
assertTrue(wt > currentWatermark);
return new Tuple2<>(nextElementValue, wt);
}
else {
throw new IllegalArgumentException("unrecognized element: " + element);
}
}
// ------------------------------------------------------------------------
private static class LongExtractor implements AssignerWithPeriodicWatermarks<Long> {
private static final long serialVersionUID = 1L;
private long currentTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
currentTimestamp = element;
return element;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
}
private static class NeverWatermarkExtractor implements AssignerWithPeriodicWatermarks<Long> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
return element;
}
@Override
public Watermark getCurrentWatermark() {
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
/**
* Tests for {@link TimestampsAndPunctuatedWatermarksOperator}.
*/
public class TimestampsAndPunctuatedWatermarksOperatorTest {
@Test
@SuppressWarnings("unchecked")
public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
final TimestampsAndPunctuatedWatermarksOperator<Tuple2<Long, Boolean>> operator =
new TimestampsAndPunctuatedWatermarksOperator<>(new PunctuatedExtractor());
OneInputStreamOperatorTestHarness<Tuple2<Long, Boolean>, Tuple2<Long, Boolean>> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>(3L, true), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(5L, false), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, false), 0L));
testHarness.processWatermark(new Watermark(10)); // this watermark should be ignored
testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, false), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, true), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(9L, false), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(5L, false), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(7L, true), 0L));
testHarness.processElement(new StreamRecord<>(new Tuple2<>(10L, false), 0L));
testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
assertEquals(3L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(3L, ((Watermark) output.poll()).getTimestamp());
assertEquals(5L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(4L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(4L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(4L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(4L, ((Watermark) output.poll()).getTimestamp());
assertEquals(9L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(5L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(7L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(7L, ((Watermark) output.poll()).getTimestamp());
assertEquals(10L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
assertEquals(Long.MAX_VALUE, ((Watermark) output.poll()).getTimestamp());
}
@Test
public void testZeroOnNegativeTimestamps() throws Exception {
final AssignerWithPunctuatedWatermarks<Long> assigner = new NeverWatermarkExtractor();
final TimestampsAndPunctuatedWatermarksOperator<Long> operator =
new TimestampsAndPunctuatedWatermarksOperator<Long>(assigner);
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
new OneInputStreamOperatorTestHarness<Long, Long>(operator);
testHarness.open();
long[] values = { Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE };
for (long value : values) {
testHarness.processElement(new StreamRecord<>(value));
}
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
for (long value: values) {
assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
}
}
// ------------------------------------------------------------------------
private static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Boolean>> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Tuple2<Long, Boolean> element, long previousTimestamp) {
return element.f0;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<Long, Boolean> lastElement, long extractedTimestamp) {
return lastElement.f1 ? new Watermark(extractedTimestamp) : null;
}
}
private static class NeverWatermarkExtractor implements AssignerWithPunctuatedWatermarks<Long> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
return element;
}
@Override
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册