From 59714b9d6addb1dbf2171cab937a0e3fec52f2b1 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sat, 16 May 2020 20:07:39 +0200 Subject: [PATCH] [FLINK-17766] Use checkpoint lock instead of fine-grained locking in Kafka AbstractFetcher Before, we were locking on the partition state object itself to prevent concurrent access (and to make sure that changes are visible across threads). However, after recent changes we hold the checkpoint lock for emitting the whole "bundle" of records from Kafka. We can now also just use the checkpoint lock in the periodic emitter callback and then don't need the fine-grained locking on the state for record emission. --- .../kafka/internals/AbstractFetcher.java | 38 ++++++------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 6c0066af43f..9ad685cd42e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -195,6 +195,7 @@ public abstract class AbstractFetcher { // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) { PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter<>( + checkpointLock, subscribedPartitionStates, watermarkOutputMultiplexer, processingTimeProvider, @@ -347,27 +348,11 @@ public abstract class AbstractFetcher { synchronized (checkpointLock) { T record; while ((record = records.poll()) != null) { - long timestamp; - - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (partitionState) { - - // You would expect that we don't have to do this under lock. You would be wrong: - // A WatermarkStrategy can wrap an old-style combined - // timestamp extractor/watermark assigner, in which case the TimestampAssigner and - // WatermarkGenerator wrap one and the same object, where extracting the timestamp - // updates the internal state of the assigner. - timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); - } + long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); sourceContext.collectWithTimestamp(record, timestamp); - // TODO: not sure it's a good idea to split it into two Synchronized blocks, but - // we have to move the onEvent after the collect call, otherwise the WM would - // be emitted before the record - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (partitionState) { - partitionState.onEvent(record, timestamp); - } + // this might emit a watermark, so do it after emitting the record + partitionState.onEvent(record, timestamp); } partitionState.setOffset(offset); } @@ -539,6 +524,8 @@ public abstract class AbstractFetcher { */ private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { + private final Object checkpointLock; + private final List> allPartitions; private final WatermarkOutputMultiplexer watermarkOutputMultiplexer; @@ -550,10 +537,12 @@ public abstract class AbstractFetcher { //------------------------------------------------- PeriodicWatermarkEmitter( + Object checkpointLock, List> allPartitions, WatermarkOutputMultiplexer watermarkOutputMultiplexer, ProcessingTimeService timerService, long autoWatermarkInterval) { + this.checkpointLock = checkpointLock; this.allPartitions = checkNotNull(allPartitions); this.watermarkOutputMultiplexer = watermarkOutputMultiplexer; this.timerService = checkNotNull(timerService); @@ -569,19 +558,14 @@ public abstract class AbstractFetcher { @Override public void onProcessingTime(long timestamp) { - for (KafkaTopicPartitionState state : allPartitions) { - - // we do this under the state lock, to prevent concurrent modification to any - // internal variables - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (state) { + synchronized (checkpointLock) { + for (KafkaTopicPartitionState state : allPartitions) { state.onPeriodicEmit(); } + watermarkOutputMultiplexer.onPeriodicEmit(); } - watermarkOutputMultiplexer.onPeriodicEmit(); - // schedule the next watermark timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); } -- GitLab