[FLINK-17669] Add WatermarkOutputMultiplexer

This can be used in source implementations (or anywhere else really)
that need to multiplex watermark updates from multiple partitions/splits
into one single (network) watermark output.
上级 3d339de8
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A {@link WatermarkOutputMultiplexer} combines the watermark (and idleness) updates of multiple
* partitions/shards/splits into one combined watermark update and forwards it to an underlying
* {@link WatermarkOutput}.
*
* <p>A multiplexed output can either be immediate or deferred. Watermark updates on an immediate
* output will potentially directly affect the combined watermark state, which will be forwarded to
* the underlying output immediately. Watermark updates on a deferred output will only update an
* internal state but not directly update the combined watermark state. Only when {@link
* #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the
* underlying output.
*
* <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput()}
* and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output
* ID you get from that. You can get both an immediate and deferred output for a given output ID,
* you can also call the getters multiple times.
*
* <p><b>WARNING:</b>This class is not thread safe.
*/
@Internal
public class WatermarkOutputMultiplexer {
/**
* The {@link WatermarkOutput} that we use to emit our multiplexed watermark updates. We assume
* that outside code holds a coordinating lock so we don't lock in this class when accessing
* this {@link WatermarkOutput}.
*/
private final WatermarkOutput underlyingOutput;
/** The id to use for the next registered output. */
private int nextOutputId = 0;
/** The combined watermark over the per-output watermarks. */
private long combinedWatermark = Long.MIN_VALUE;
/**
* Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id.
*/
private final Map<Integer, OutputState> watermarkPerOutputId;
/**
* List of all watermark outputs, for efficient access.
*/
private final List<OutputState> watermarkOutputs;
/**
* Creates a new {@link WatermarkOutputMultiplexer} that emits combined updates to the given
* {@link WatermarkOutput}.
*/
public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
this.underlyingOutput = underlyingOutput;
this.watermarkPerOutputId = new HashMap<>();
this.watermarkOutputs = new ArrayList<>();
}
/**
* Registers a new multiplexed output, which creates internal states for that output and returns
* an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
* output.
*/
public int registerNewOutput() {
int newOutputId = nextOutputId;
nextOutputId++;
OutputState outputState = new OutputState();
watermarkPerOutputId.put(newOutputId, outputState);
watermarkOutputs.add(outputState);
return newOutputId;
}
/**
* Returns an immediate {@link WatermarkOutput} for the given output ID.
*
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
public WatermarkOutput getImmediateOutput(int outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
OutputState outputState = watermarkPerOutputId.get(outputId);
return new ImmediateOutput(outputState);
}
/**
* Returns a deferred {@link WatermarkOutput} for the given output ID.
*
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
public WatermarkOutput getDeferredOutput(int outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
OutputState outputState = watermarkPerOutputId.get(outputId);
return new DeferredOutput(outputState);
}
/**
* Tells the {@link WatermarkOutputMultiplexer} to combine all outstanding deferred watermark
* updates and possibly emit a new update to the underlying {@link WatermarkOutput}.
*/
public void onPeriodicEmit() {
updateCombinedWatermark();
}
/**
* Checks whether we need to update the combined watermark. Should be called when a newly
* emitted per-output watermark is higher than the max so far or if we need to combined the
* deferred per-output updates.
*/
private void updateCombinedWatermark() {
long minimumOverAllOutputs = Long.MAX_VALUE;
boolean hasOutputs = false;
boolean allIdle = true;
for (OutputState outputState : watermarkOutputs) {
if (!outputState.isIdle()) {
minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
allIdle = false;
}
hasOutputs = true;
}
// if we don't have any outputs minimumOverAllOutputs is not valid, it's still
// at its initial Long.MAX_VALUE state and we must not emit that
if (!hasOutputs) {
return;
}
if (allIdle) {
underlyingOutput.markIdle();
} else if (minimumOverAllOutputs > combinedWatermark) {
combinedWatermark = minimumOverAllOutputs;
underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
}
}
/**
* Per-output watermark state.
*/
private static class OutputState {
private volatile long watermark = Long.MIN_VALUE;
private volatile boolean idle = false;
/**
* Returns the current watermark timestamp. This will throw {@link IllegalStateException} if
* the output is currently idle.
*/
public long getWatermark() {
checkState(!idle, "Output is idle.");
return watermark;
}
/**
* Returns true if the watermark was advanced, that is if the new watermark is larger than
* the previous one.
*
* <p>Setting a watermark will clear the idleness flag.
*/
public boolean setWatermark(long watermark) {
this.idle = false;
if (watermark > this.watermark) {
this.watermark = watermark;
return true;
} else {
return false;
}
}
public boolean isIdle() {
return idle;
}
public void setIdle(boolean idle) {
this.idle = idle;
}
}
/**
* Updating the state of an immediate output can possible lead to a combined watermark update to
* the underlying {@link WatermarkOutput}.
*/
private class ImmediateOutput implements WatermarkOutput {
private final OutputState state;
public ImmediateOutput(OutputState state) {
this.state = state;
}
@Override
public void emitWatermark(Watermark watermark) {
long timestamp = watermark.getTimestamp();
boolean wasUpdated = state.setWatermark(timestamp);
// if it's higher than the max watermark so far we might have to update the
// combined watermark
if (wasUpdated && timestamp > combinedWatermark) {
updateCombinedWatermark();
}
}
@Override
public void markIdle() {
state.setIdle(true);
// this can always lead to an advancing watermark. We don't know if this output
// was holding back the watermark or not.
updateCombinedWatermark();
}
}
/**
* Updating the state of a deferred output will never lead to a combined watermark update. Only
* when {@link WatermarkOutputMultiplexer#onPeriodicEmit()} is called will the deferred updates
* be combined into a potential combined update of the underlying {@link WatermarkOutput}.
*/
private static class DeferredOutput implements WatermarkOutput {
private final OutputState state;
public DeferredOutput(OutputState state) {
this.state = state;
}
@Override
public void emitWatermark(Watermark watermark) {
state.setWatermark(watermark.getTimestamp());
}
@Override
public void markIdle() {
state.setIdle(true);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.junit.Test;
import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
/**
* Tests for the {@link WatermarkOutputMultiplexer}.
*/
public class WatermarkOutputMultiplexerTest {
@Test
public void singleImmediateWatermark() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer);
watermarkOutput.emitWatermark(new Watermark(0));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void singleImmediateIdleness() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer);
watermarkOutput.markIdle();
assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue());
assertThat(underlyingWatermarkOutput.isIdle(), is(true));
}
@Test
public void singleImmediateWatermarkAfterIdleness() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createImmediateOutput(multiplexer);
watermarkOutput.markIdle();
assertThat(underlyingWatermarkOutput.isIdle(), is(true));
watermarkOutput.emitWatermark(new Watermark(0));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void multipleImmediateWatermark() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer);
WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer);
WatermarkOutput watermarkOutput3 = createImmediateOutput(multiplexer);
watermarkOutput1.emitWatermark(new Watermark(2));
watermarkOutput2.emitWatermark(new Watermark(5));
watermarkOutput3.markIdle();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void whenImmediateOutputBecomesIdleWatermarkAdvances() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer);
WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer);
watermarkOutput1.emitWatermark(new Watermark(2));
watermarkOutput2.emitWatermark(new Watermark(5));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2)));
watermarkOutput1.markIdle();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
}
@Test
public void combinedWatermarkDoesNotRegressWhenIdleOutputRegresses() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer);
WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer);
watermarkOutput1.emitWatermark(new Watermark(2));
watermarkOutput2.emitWatermark(new Watermark(5));
watermarkOutput1.markIdle();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
watermarkOutput1.emitWatermark(new Watermark(3));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
}
/**
* This test makes sure that we don't output any update if there are zero outputs. Due to how
* aggregation of deferred updates in the KafkaConsumer works we had a bug there that caused a
* Long.MAX_VALUE watermark to be emitted in case of zero partitions.
*/
@Test
public void noCombinedDeferredUpdateWhenWeHaveZeroOutputs() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
}
@Test
public void deferredOutputDoesNotImmediatelyAdvanceWatermark() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer);
WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer);
watermarkOutput1.emitWatermark(new Watermark(0));
watermarkOutput2.emitWatermark(new Watermark(1));
assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue());
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0)));
}
@Test
public void singleDeferredWatermark() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer);
watermarkOutput.emitWatermark(new Watermark(0));
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void singleDeferredIdleness() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer);
watermarkOutput.markIdle();
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), nullValue());
assertThat(underlyingWatermarkOutput.isIdle(), is(true));
}
@Test
public void singleDeferredWatermarkAfterIdleness() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput = createDeferredOutput(multiplexer);
watermarkOutput.markIdle();
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.isIdle(), is(true));
watermarkOutput.emitWatermark(new Watermark(0));
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(0)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void multipleDeferredWatermark() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer);
WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer);
WatermarkOutput watermarkOutput3 = createDeferredOutput(multiplexer);
watermarkOutput1.emitWatermark(new Watermark(2));
watermarkOutput2.emitWatermark(new Watermark(5));
watermarkOutput3.markIdle();
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2)));
assertThat(underlyingWatermarkOutput.isIdle(), is(false));
}
@Test
public void immediateUpdatesTakeDeferredUpdatesIntoAccount() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
WatermarkOutput immediateOutput = createImmediateOutput(multiplexer);
WatermarkOutput deferredOutput = createDeferredOutput(multiplexer);
deferredOutput.emitWatermark(new Watermark(5));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
immediateOutput.emitWatermark(new Watermark(2));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(2)));
}
@Test
public void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
int outputId = multiplexer.registerNewOutput();
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
deferredOutput.emitWatermark(new Watermark(5));
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
immediateOutput.emitWatermark(new Watermark(2));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
multiplexer.onPeriodicEmit();
assertThat(underlyingWatermarkOutput.lastWatermark(), is(watermark(5)));
}
@Test
public void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
int outputId = multiplexer.registerNewOutput();
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
deferredOutput.emitWatermark(new Watermark(5));
immediateOutput.emitWatermark(new Watermark(2));
assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
}
/**
* Convenience method so we don't have to go through the output ID dance when we only want an
* immediate output for a given output ID.
*/
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
int outputId = multiplexer.registerNewOutput();
return multiplexer.getImmediateOutput(outputId);
}
/**
* Convenience method so we don't have to go through the output ID dance when we only want an
* deferred output for a given output ID.
*/
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
int outputId = multiplexer.registerNewOutput();
return multiplexer.getDeferredOutput(outputId);
}
private static TestingWatermarkOutput createTestingWatermarkOutput() {
return new TestingWatermarkOutput();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册