[FLINK-17766] Use checkpoint lock instead of fine-grained locking in Kafka AbstractFetcher

Before, we were locking on the partition state object itself to prevent
concurrent access (and to make sure that changes are visible across
threads). However, after recent changes we hold the checkpoint lock for
emitting the whole "bundle" of records from Kafka. We can now also just
use the checkpoint lock in the periodic emitter callback and then don't
need the fine-grained locking on the state for record emission.
上级 5be27b09
......@@ -195,6 +195,7 @@ public abstract class AbstractFetcher<T, KPH> {
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, KPH> periodicEmitter = new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedPartitionStates,
watermarkOutputMultiplexer,
processingTimeProvider,
......@@ -347,27 +348,11 @@ public abstract class AbstractFetcher<T, KPH> {
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
long timestamp;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (partitionState) {
// You would expect that we don't have to do this under lock. You would be wrong:
// A WatermarkStrategy can wrap an old-style combined
// timestamp extractor/watermark assigner, in which case the TimestampAssigner and
// WatermarkGenerator wrap one and the same object, where extracting the timestamp
// updates the internal state of the assigner.
timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
}
long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
// TODO: not sure it's a good idea to split it into two Synchronized blocks, but
// we have to move the onEvent after the collect call, otherwise the WM would
// be emitted before the record
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (partitionState) {
partitionState.onEvent(record, timestamp);
}
// this might emit a watermark, so do it after emitting the record
partitionState.onEvent(record, timestamp);
}
partitionState.setOffset(offset);
}
......@@ -539,6 +524,8 @@ public abstract class AbstractFetcher<T, KPH> {
*/
private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeCallback {
private final Object checkpointLock;
private final List<KafkaTopicPartitionState<T, KPH>> allPartitions;
private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
......@@ -550,10 +537,12 @@ public abstract class AbstractFetcher<T, KPH> {
//-------------------------------------------------
PeriodicWatermarkEmitter(
Object checkpointLock,
List<KafkaTopicPartitionState<T, KPH>> allPartitions,
WatermarkOutputMultiplexer watermarkOutputMultiplexer,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
this.checkpointLock = checkpointLock;
this.allPartitions = checkNotNull(allPartitions);
this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
this.timerService = checkNotNull(timerService);
......@@ -569,19 +558,14 @@ public abstract class AbstractFetcher<T, KPH> {
@Override
public void onProcessingTime(long timestamp) {
for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
// we do this under the state lock, to prevent concurrent modification to any
// internal variables
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (state) {
synchronized (checkpointLock) {
for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
state.onPeriodicEmit();
}
watermarkOutputMultiplexer.onPeriodicEmit();
}
watermarkOutputMultiplexer.onPeriodicEmit();
// schedule the next watermark
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册