diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java new file mode 100644 index 0000000000000000000000000000000000000000..83976e3536e37c1d889eda45c48037e5ff4b5b60 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.eventtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) updates of multiple + * partitions/shards/splits into one combined watermark update and forwards it to an underlying + * {@link WatermarkOutput}. + * + *

A multiplexed output can either be immediate or deferred. Watermark updates on an immediate + * output will potentially directly affect the combined watermark state, which will be forwarded to + * the underlying output immediately. Watermark updates on a deferred output will only update an + * internal state but not directly update the combined watermark state. Only when {@link + * #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the + * underlying output. + * + *

For registering a new multiplexed output, you must first call {@link #registerNewOutput()} + * and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output + * ID you get from that. You can get both an immediate and deferred output for a given output ID, + * you can also call the getters multiple times. + * + *

WARNING:This class is not thread safe. + */ +@Internal +public class WatermarkOutputMultiplexer { + + /** + * The {@link WatermarkOutput} that we use to emit our multiplexed watermark updates. We assume + * that outside code holds a coordinating lock so we don't lock in this class when accessing + * this {@link WatermarkOutput}. + */ + private final WatermarkOutput underlyingOutput; + + /** The id to use for the next registered output. */ + private int nextOutputId = 0; + + /** The combined watermark over the per-output watermarks. */ + private long combinedWatermark = Long.MIN_VALUE; + + /** + * Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id. + */ + private final Map watermarkPerOutputId; + + /** + * List of all watermark outputs, for efficient access. + */ + private final List watermarkOutputs; + + /** + * Creates a new {@link WatermarkOutputMultiplexer} that emits combined updates to the given + * {@link WatermarkOutput}. + */ + public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) { + this.underlyingOutput = underlyingOutput; + this.watermarkPerOutputId = new HashMap<>(); + this.watermarkOutputs = new ArrayList<>(); + } + + /** + * Registers a new multiplexed output, which creates internal states for that output and returns + * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that + * output. + */ + public int registerNewOutput() { + int newOutputId = nextOutputId; + nextOutputId++; + OutputState outputState = new OutputState(); + watermarkPerOutputId.put(newOutputId, outputState); + watermarkOutputs.add(outputState); + return newOutputId; + } + + /** + * Returns an immediate {@link WatermarkOutput} for the given output ID. + * + *

>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred + * outputs. + */ + public WatermarkOutput getImmediateOutput(int outputId) { + Preconditions.checkArgument( + watermarkPerOutputId.containsKey(outputId), + "no output registered under id " + outputId); + + OutputState outputState = watermarkPerOutputId.get(outputId); + return new ImmediateOutput(outputState); + } + + /** + * Returns a deferred {@link WatermarkOutput} for the given output ID. + * + *

>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred + * outputs. + */ + public WatermarkOutput getDeferredOutput(int outputId) { + Preconditions.checkArgument( + watermarkPerOutputId.containsKey(outputId), + "no output registered under id " + outputId); + + OutputState outputState = watermarkPerOutputId.get(outputId); + return new DeferredOutput(outputState); + } + + /** + * Tells the {@link WatermarkOutputMultiplexer} to combine all outstanding deferred watermark + * updates and possibly emit a new update to the underlying {@link WatermarkOutput}. + */ + public void onPeriodicEmit() { + updateCombinedWatermark(); + } + + /** + * Checks whether we need to update the combined watermark. Should be called when a newly + * emitted per-output watermark is higher than the max so far or if we need to combined the + * deferred per-output updates. + */ + private void updateCombinedWatermark() { + long minimumOverAllOutputs = Long.MAX_VALUE; + + boolean hasOutputs = false; + boolean allIdle = true; + for (OutputState outputState : watermarkOutputs) { + if (!outputState.isIdle()) { + minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark()); + allIdle = false; + } + hasOutputs = true; + } + + // if we don't have any outputs minimumOverAllOutputs is not valid, it's still + // at its initial Long.MAX_VALUE state and we must not emit that + if (!hasOutputs) { + return; + } + + if (allIdle) { + underlyingOutput.markIdle(); + } else if (minimumOverAllOutputs > combinedWatermark) { + combinedWatermark = minimumOverAllOutputs; + underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs)); + } + } + + /** + * Per-output watermark state. + */ + private static class OutputState { + private volatile long watermark = Long.MIN_VALUE; + private volatile boolean idle = false; + + /** + * Returns the current watermark timestamp. This will throw {@link IllegalStateException} if + * the output is currently idle. + */ + public long getWatermark() { + checkState(!idle, "Output is idle."); + return watermark; + } + + /** + * Returns true if the watermark was advanced, that is if the new watermark is larger than + * the previous one. + * + *

Setting a watermark will clear the idleness flag. + */ + public boolean setWatermark(long watermark) { + this.idle = false; + if (watermark > this.watermark) { + this.watermark = watermark; + return true; + } else { + return false; + } + } + + public boolean isIdle() { + return idle; + } + + public void setIdle(boolean idle) { + this.idle = idle; + } + } + + /** + * Updating the state of an immediate output can possible lead to a combined watermark update to + * the underlying {@link WatermarkOutput}. + */ + private class ImmediateOutput implements WatermarkOutput { + + private final OutputState state; + + public ImmediateOutput(OutputState state) { + this.state = state; + } + + @Override + public void emitWatermark(Watermark watermark) { + long timestamp = watermark.getTimestamp(); + boolean wasUpdated = state.setWatermark(timestamp); + + // if it's higher than the max watermark so far we might have to update the + // combined watermark + if (wasUpdated && timestamp > combinedWatermark) { + updateCombinedWatermark(); + } + } + + @Override + public void markIdle() { + state.setIdle(true); + + // this can always lead to an advancing watermark. We don't know if this output + // was holding back the watermark or not. + updateCombinedWatermark(); + } + } + + /** + * Updating the state of a deferred output will never lead to a combined watermark update. Only + * when {@link WatermarkOutputMultiplexer#onPeriodicEmit()} is called will the deferred updates + * be combined into a potential combined update of the underlying {@link WatermarkOutput}. + */ + private static class DeferredOutput implements WatermarkOutput { + + private final OutputState state; + + public DeferredOutput(OutputState state) { + this.state = state; + } + + @Override + public void emitWatermark(Watermark watermark) { + state.setWatermark(watermark.getTimestamp()); + } + + @Override + public void markIdle() { + state.setIdle(true); + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..224bf482f709d77fd46891f1278e1238b388d965 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.eventtime; + +import org.junit.Test; + +import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link WatermarkOutputMultiplexer}. + */ +public class WatermarkOutputMultiplexerTest { + + @Test + public void singleImmediateWatermark() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer); + + watermarkOutput.emitWatermark(new Watermark(0)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void singleImmediateIdleness() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer); + + watermarkOutput.markIdle(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue()); + assertThat(underlyingWatermarkOutput.isIdle(), is(true)); + } + + @Test + public void singleImmediateWatermarkAfterIdleness() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer); + + watermarkOutput.markIdle(); + assertThat(underlyingWatermarkOutput.isIdle(), is(true)); + + watermarkOutput.emitWatermark(new Watermark(0)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void multipleImmediateWatermark() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer); + WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer); + WatermarkOutput watermarkOutput3 = createImmediateOutput(multiplexer); + + watermarkOutput1.emitWatermark(new Watermark(2)); + watermarkOutput2.emitWatermark(new Watermark(5)); + watermarkOutput3.markIdle(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void whenImmediateOutputBecomesIdleWatermarkAdvances() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer); + WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer); + + watermarkOutput1.emitWatermark(new Watermark(2)); + watermarkOutput2.emitWatermark(new Watermark(5)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2))); + + watermarkOutput1.markIdle(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + } + + @Test + public void combinedWatermarkDoesNotRegressWhenIdleOutputRegresses() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer); + WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer); + + watermarkOutput1.emitWatermark(new Watermark(2)); + watermarkOutput2.emitWatermark(new Watermark(5)); + watermarkOutput1.markIdle(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + + watermarkOutput1.emitWatermark(new Watermark(3)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + } + + /** + * This test makes sure that we don't output any update if there are zero outputs. Due to how + * aggregation of deferred updates in the KafkaConsumer works we had a bug there that caused a + * Long.MAX_VALUE watermark to be emitted in case of zero partitions. + */ + @Test + public void noCombinedDeferredUpdateWhenWeHaveZeroOutputs() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue())); + } + + @Test + public void deferredOutputDoesNotImmediatelyAdvanceWatermark() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer); + WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer); + + watermarkOutput1.emitWatermark(new Watermark(0)); + watermarkOutput2.emitWatermark(new Watermark(1)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue()); + + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0))); + } + + @Test + public void singleDeferredWatermark() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer); + + watermarkOutput.emitWatermark(new Watermark(0)); + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void singleDeferredIdleness() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer); + + watermarkOutput.markIdle(); + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue()); + assertThat(underlyingWatermarkOutput.isIdle(), is(true)); + } + + @Test + public void singleDeferredWatermarkAfterIdleness() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer); + + watermarkOutput.markIdle(); + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.isIdle(), is(true)); + + watermarkOutput.emitWatermark(new Watermark(0)); + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void multipleDeferredWatermark() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer); + WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer); + WatermarkOutput watermarkOutput3 = createDeferredOutput(multiplexer); + + watermarkOutput1.emitWatermark(new Watermark(2)); + watermarkOutput2.emitWatermark(new Watermark(5)); + watermarkOutput3.markIdle(); + + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2))); + assertThat(underlyingWatermarkOutput.isIdle(), is(false)); + } + + @Test + public void immediateUpdatesTakeDeferredUpdatesIntoAccount() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + WatermarkOutput immediateOutput = createImmediateOutput(multiplexer); + WatermarkOutput deferredOutput = createDeferredOutput(multiplexer); + + deferredOutput.emitWatermark(new Watermark(5)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue())); + + immediateOutput.emitWatermark(new Watermark(2)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2))); + } + + @Test + public void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + int outputId = multiplexer.registerNewOutput(); + WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId); + WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId); + + deferredOutput.emitWatermark(new Watermark(5)); + multiplexer.onPeriodicEmit(); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + + immediateOutput.emitWatermark(new Watermark(2)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + + multiplexer.onPeriodicEmit(); + assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5))); + } + + @Test + public void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() { + TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); + WatermarkOutputMultiplexer multiplexer = + new WatermarkOutputMultiplexer(underlyingWatermarkOutput); + + int outputId = multiplexer.registerNewOutput(); + WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId); + WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId); + + deferredOutput.emitWatermark(new Watermark(5)); + immediateOutput.emitWatermark(new Watermark(2)); + + assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue())); + } + + /** + * Convenience method so we don't have to go through the output ID dance when we only want an + * immediate output for a given output ID. + */ + private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) { + int outputId = multiplexer.registerNewOutput(); + return multiplexer.getImmediateOutput(outputId); + } + + /** + * Convenience method so we don't have to go through the output ID dance when we only want an + * deferred output for a given output ID. + */ + private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) { + int outputId = multiplexer.registerNewOutput(); + return multiplexer.getDeferredOutput(outputId); + } + + private static TestingWatermarkOutput createTestingWatermarkOutput() { + return new TestingWatermarkOutput(); + } +}