diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 18a6dba861c162f02aa774452fb08f6084d4a426..1821c15ee5b2f76a2c86335b54762bcf8cbb9f41 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -18,11 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; @@ -245,8 +244,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumerBase { protected AbstractFetcher createFetcher( SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, @@ -264,8 +262,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumerBase { return new Kafka010Fetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index ac8d27c7a9e8ba12a5b5591fc9b34fa9d74cd4fd..c5505bed7204f42c69b30b7847941187dbe1bdef 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; @@ -81,8 +80,7 @@ public class Kafka010Fetcher extends AbstractFetcher { public Kafka010Fetcher( SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, @@ -97,8 +95,7 @@ public class Kafka010Fetcher extends AbstractFetcher { super( sourceContext, assignedPartitionsWithInitialOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, @@ -141,7 +138,7 @@ public class Kafka010Fetcher extends AbstractFetcher { final ConsumerRecords records = handover.pollNext(); // get the records for each topic partition - for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { + for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { List> partitionRecords = records.records(partition.getKafkaPartitionHandle()); @@ -204,12 +201,11 @@ public class Kafka010Fetcher extends AbstractFetcher { Map offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { - @SuppressWarnings("unchecked") - List> partitions = subscribedPartitionStates(); + List> partitions = subscribedPartitionStates(); Map offsetsToCommit = new HashMap<>(partitions.size()); - for (KafkaTopicPartitionState partition : partitions) { + for (KafkaTopicPartitionState partition : partitions) { Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); if (lastProcessedOffset != null) { checkState(lastProcessedOffset >= 0, "Illegal offset value to commit"); diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index 066cebb6c39e642384c14d83f8e0689c08a673ce..6409f572e29683d63c282f714149a69196586cdf 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * to the KafkaConsumer calls that change signature. */ @Internal -public class KafkaConsumerThread extends Thread { +public class KafkaConsumerThread extends Thread { /** Logger for this consumer. */ private final Logger log; @@ -82,7 +82,7 @@ public class KafkaConsumerThread extends Thread { private final Properties kafkaProperties; /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ - private final ClosableBlockingQueue> unassignedPartitionsQueue; + private final ClosableBlockingQueue> unassignedPartitionsQueue; /** The maximum number of milliseconds to wait for a fetch batch. */ private final long pollTimeout; @@ -128,7 +128,7 @@ public class KafkaConsumerThread extends Thread { Logger log, Handover handover, Properties kafkaProperties, - ClosableBlockingQueue> unassignedPartitionsQueue, + ClosableBlockingQueue> unassignedPartitionsQueue, String threadName, long pollTimeout, boolean useMetrics, @@ -214,7 +214,7 @@ public class KafkaConsumerThread extends Thread { // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue - List> newPartitions; + List> newPartitions; // main fetch loop while (running) { @@ -391,7 +391,7 @@ public class KafkaConsumerThread extends Thread { *

This method is exposed for testing purposes. */ @VisibleForTesting - void reassignPartitions(List> newPartitions) throws Exception { + void reassignPartitions(List> newPartitions) throws Exception { if (newPartitions.size() == 0) { return; } @@ -433,7 +433,7 @@ public class KafkaConsumerThread extends Thread { // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. - for (KafkaTopicPartitionState newPartitionState : newPartitions) { + for (KafkaTopicPartitionState newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle())); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); @@ -472,7 +472,7 @@ public class KafkaConsumerThread extends Thread { hasBufferedWakeup = false; // re-add all new partitions back to the unassigned partitions queue to be picked up again - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -545,9 +545,9 @@ public class KafkaConsumerThread extends Thread { // Utilities // ------------------------------------------------------------------------ - private static List convertKafkaPartitions(List> partitions) { + private static List convertKafkaPartitions(List> partitions) { ArrayList result = new ArrayList<>(partitions.size()); - for (KafkaTopicPartitionState p : partitions) { + for (KafkaTopicPartitionState p : partitions) { result.add(p.getKafkaPartitionHandle()); } return result; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java index 97d3d04226b8757f7288f850ded1f5260ae1680e..8a2632add48a8bfc7daab94d791f009da2a76d7a 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java @@ -120,8 +120,7 @@ public class Kafka010FetcherTest { final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, partitionsWithInitialOffsets, - null, /* periodic assigner */ - null, /* punctuated assigner */ + null, /* watermark strategy */ new TestProcessingTimeService(), 10, getClass().getClassLoader(), @@ -257,8 +256,7 @@ public class Kafka010FetcherTest { final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, partitionsWithInitialOffsets, - null, /* periodic assigner */ - null, /* punctuated assigner */ + null, /* watermark strategy */ new TestProcessingTimeService(), 10, getClass().getClassLoader(), @@ -372,8 +370,7 @@ public class Kafka010FetcherTest { final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, partitionsWithInitialOffsets, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ + null, /* watermark strategy */ new TestProcessingTimeService(), 10, /* watermark interval */ this.getClass().getClassLoader(), diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java index e2cc95727d1bb1e34f339a9ff144237e5d85e3dd..92701de68e45cc69bc88d79b724a05269cffd719 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java @@ -96,10 +96,10 @@ public class KafkaConsumerThreadTest { // setup latch so the test waits until testThread is blocked on getBatchBlocking method final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch(); - final ClosableBlockingQueue> unassignedPartitionsQueue = - new ClosableBlockingQueue>() { + final ClosableBlockingQueue> unassignedPartitionsQueue = + new ClosableBlockingQueue>() { @Override - public List> getBatchBlocking() throws InterruptedException { + public List> getBatchBlocking() throws InterruptedException { getBatchBlockingInvoked.trigger(); return super.getBatchBlocking(); } @@ -129,15 +129,15 @@ public class KafkaConsumerThreadTest { // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); newPartition1.setOffset(23L); - KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); newPartition2.setOffset(31L); - final List> newPartitions = new ArrayList<>(2); + final List> newPartitions = new ArrayList<>(2); newPartitions.add(newPartition1); newPartitions.add(newPartition2); @@ -155,10 +155,10 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -175,7 +175,7 @@ public class KafkaConsumerThreadTest { assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); // should be seeked to (offset in state + 1) because offsets in state represent the last processed record @@ -202,15 +202,15 @@ public class KafkaConsumerThreadTest { // -------- new partitions with undefined offsets -------- - KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - final List> newPartitions = new ArrayList<>(2); + final List> newPartitions = new ArrayList<>(2); newPartitions.add(newPartition1); newPartitions.add(newPartition2); @@ -233,10 +233,10 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -257,7 +257,7 @@ public class KafkaConsumerThreadTest { assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); // should be seeked to (offset in state + 1) because offsets in state represent the last processed record @@ -284,25 +284,25 @@ public class KafkaConsumerThreadTest { // -------- old partitions -------- - KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); oldPartition1.setOffset(23L); - KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); oldPartition2.setOffset(32L); - List> oldPartitions = new ArrayList<>(2); + List> oldPartitions = new ArrayList<>(2); oldPartitions.add(oldPartition1); oldPartitions.add(oldPartition2); // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); newPartition.setOffset(29L); - List> totalPartitions = new ArrayList<>(3); + List> totalPartitions = new ArrayList<>(3); totalPartitions.add(oldPartition1); totalPartitions.add(oldPartition2); totalPartitions.add(newPartition); @@ -311,7 +311,7 @@ public class KafkaConsumerThreadTest { // has initial assignments final Map mockConsumerAssignmentsAndPositions = new HashMap<>(); - for (KafkaTopicPartitionState oldPartition : oldPartitions) { + for (KafkaTopicPartitionState oldPartition : oldPartitions) { mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); } @@ -324,7 +324,7 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); unassignedPartitionsQueue.add(newPartition); @@ -343,7 +343,7 @@ public class KafkaConsumerThreadTest { assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size()); // old partitions should be re-seeked to their previous positions - for (KafkaTopicPartitionState partition : totalPartitions) { + for (KafkaTopicPartitionState partition : totalPartitions) { assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle())); // should be seeked to (offset in state + 1) because offsets in state represent the last processed record @@ -370,25 +370,25 @@ public class KafkaConsumerThreadTest { // -------- old partitions -------- - KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); oldPartition1.setOffset(23L); - KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); oldPartition2.setOffset(32L); - List> oldPartitions = new ArrayList<>(2); + List> oldPartitions = new ArrayList<>(2); oldPartitions.add(oldPartition1); oldPartitions.add(oldPartition2); // -------- new partitions with undefined offsets -------- - KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - List> totalPartitions = new ArrayList<>(3); + List> totalPartitions = new ArrayList<>(3); totalPartitions.add(oldPartition1); totalPartitions.add(oldPartition2); totalPartitions.add(newPartition); @@ -397,7 +397,7 @@ public class KafkaConsumerThreadTest { // has initial assignments final Map mockConsumerAssignmentsAndPositions = new HashMap<>(); - for (KafkaTopicPartitionState oldPartition : oldPartitions) { + for (KafkaTopicPartitionState oldPartition : oldPartitions) { mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); } @@ -414,7 +414,7 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); unassignedPartitionsQueue.add(newPartition); @@ -436,7 +436,7 @@ public class KafkaConsumerThreadTest { assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size()); // old partitions should be re-seeked to their previous positions - for (KafkaTopicPartitionState partition : totalPartitions) { + for (KafkaTopicPartitionState partition : totalPartitions) { assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle())); // should be seeked to (offset in state + 1) because offsets in state represent the last processed record @@ -466,21 +466,21 @@ public class KafkaConsumerThreadTest { // -------- old partitions -------- - KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); oldPartition1.setOffset(23L); - KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState oldPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); oldPartition2.setOffset(32L); - List> oldPartitions = new ArrayList<>(2); + List> oldPartitions = new ArrayList<>(2); oldPartitions.add(oldPartition1); oldPartitions.add(oldPartition2); // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); newPartition.setOffset(29L); @@ -488,7 +488,7 @@ public class KafkaConsumerThreadTest { // initial assignments final Map mockConsumerAssignmentsToPositions = new LinkedHashMap<>(); - for (KafkaTopicPartitionState oldPartition : oldPartitions) { + for (KafkaTopicPartitionState oldPartition : oldPartitions) { mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); } @@ -501,7 +501,7 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); unassignedPartitionsQueue.add(newPartition); @@ -525,7 +525,7 @@ public class KafkaConsumerThreadTest { assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size()); - for (KafkaTopicPartitionState oldPartition : oldPartitions) { + for (KafkaTopicPartitionState oldPartition : oldPartitions) { assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle())); assertEquals( oldPartition.getOffset() + 1, @@ -554,15 +554,15 @@ public class KafkaConsumerThreadTest { // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - List> newPartitions = new ArrayList<>(2); + List> newPartitions = new ArrayList<>(2); newPartitions.add(newPartition1); newPartitions.add(newPartition2); @@ -585,10 +585,10 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -634,15 +634,15 @@ public class KafkaConsumerThreadTest { // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - List> newPartitions = new ArrayList<>(2); + List> newPartitions = new ArrayList<>(2); newPartitions.add(newPartition1); newPartitions.add(newPartition2); @@ -669,10 +669,10 @@ public class KafkaConsumerThreadTest { // -------- setup new partitions to be polled from the unassigned partitions queue -------- - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -700,7 +700,7 @@ public class KafkaConsumerThreadTest { assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); // should be seeked to (offset in state + 1) because offsets in state represent the last processed record @@ -742,17 +742,17 @@ public class KafkaConsumerThreadTest { // -------- new partitions with defined offsets -------- - KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); - List> newPartitions = new ArrayList<>(1); + List> newPartitions = new ArrayList<>(1); newPartitions.add(newPartition1); - final ClosableBlockingQueue> unassignedPartitionsQueue = + final ClosableBlockingQueue> unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -808,7 +808,7 @@ public class KafkaConsumerThreadTest { * partition reassignment, so that tests are eligible to setup various conditions before the reassignment happens * and inspect reassignment results after it is completed. */ - private static class TestKafkaConsumerThread extends KafkaConsumerThread { + private static class TestKafkaConsumerThread extends KafkaConsumerThread { private final Consumer mockConsumer; private final MultiShotLatch preReassignmentLatch = new MultiShotLatch(); @@ -818,7 +818,7 @@ public class KafkaConsumerThreadTest { public TestKafkaConsumerThread( Consumer mockConsumer, - ClosableBlockingQueue> unassignedPartitionsQueue, + ClosableBlockingQueue> unassignedPartitionsQueue, Handover handover) { super( @@ -858,7 +858,7 @@ public class KafkaConsumerThreadTest { } @Override - void reassignPartitions(List> newPartitions) throws Exception { + void reassignPartitions(List> newPartitions) throws Exception { // triggers blocking calls on waitPartitionReassignmentInvoked() preReassignmentLatch.trigger(); @@ -1103,7 +1103,7 @@ public class KafkaConsumerThreadTest { public TestKafkaConsumerThreadRateLimit(Logger log, Handover handover, Properties kafkaProperties, - ClosableBlockingQueue> unassignedPartitionsQueue, + ClosableBlockingQueue> unassignedPartitionsQueue, String threadName, long pollTimeout, boolean useMetrics, MetricGroup consumerMetricGroup, MetricGroup subtaskMetricGroup, diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index ee571ef236765171ef27bf6ee05c0931f2329d27..46688f68ae4b2b89286f745ceb7718eecdbdb843 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -53,6 +54,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; @@ -127,15 +130,12 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti /** The set of topic partitions that the source will read, with their initial offsets to start reading from. */ private Map subscribedPartitionsToStartOffsets; - /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies. */ - private SerializedValue> periodicWatermarkAssigner; - - /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies. */ - private SerializedValue> punctuatedWatermarkAssigner; + /** + * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition + * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize + * it into multiple copies. + */ + private SerializedValue> watermarkStrategy; /** * User-set flag determining whether or not to commit on checkpoints. @@ -273,6 +273,39 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti // Configuration // ------------------------------------------------------------------------ + /** + * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign + * timestamps to records and generates watermarks to signal event time progress. + * + *

Running timestamp extractors / watermark generators directly inside the Kafka source + * (which you can do by using this method), per Kafka partition, allows users to let them + * exploit the per-partition characteristics. + * + *

When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. + * Per-partition characteristics are usually lost that way. For example, if the timestamps are + * strictly ascending per Kafka partition, they will not be strictly ascending in the resulting + * Flink DataStream, if the parallel source subtask reads more than one partition. + * + *

Common watermark generation patterns can be found in the + * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} class. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase assignTimestampsAndWatermarks( + WatermarkStrategy watermarkStrategy) { + checkNotNull(watermarkStrategy); + + try { + ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.watermarkStrategy = new SerializedValue<>(watermarkStrategy); + } catch (Exception e) { + throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e); + } + + return this; + } + /** * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions @@ -290,19 +323,29 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti *

Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an * {@link AssignerWithPeriodicWatermarks}, not both at the same time. * + *

This method uses the deprecated watermark generator interfaces. Please switch to + * {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the + * new interfaces instead. The new interfaces support watermark idleness and no longer need + * to differentiate between "periodic" and "punctuated" watermarks. + * + * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. + * * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ + @Deprecated public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks assigner) { checkNotNull(assigner); - if (this.periodicWatermarkAssigner != null) { - throw new IllegalStateException("A periodic watermark emitter has already been set."); + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); } + try { ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); - return this; + final WatermarkStrategy wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); } catch (Exception e) { throw new IllegalArgumentException("The given assigner is not serializable", e); } @@ -325,19 +368,29 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti *

Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an * {@link AssignerWithPeriodicWatermarks}, not both at the same time. * + *

This method uses the deprecated watermark generator interfaces. Please switch to + * {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the + * new interfaces instead. The new interfaces support watermark idleness and no longer need + * to differentiate between "periodic" and "punctuated" watermarks. + * + * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. + * * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ + @Deprecated public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks assigner) { checkNotNull(assigner); - if (this.punctuatedWatermarkAssigner != null) { - throw new IllegalStateException("A punctuated watermark emitter has already been set."); + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); } + try { ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - this.periodicWatermarkAssigner = new SerializedValue<>(assigner); - return this; + final WatermarkStrategy wms = new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); } catch (Exception e) { throw new IllegalArgumentException("The given assigner is not serializable", e); } @@ -696,8 +749,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, - periodicWatermarkAssigner, - punctuatedWatermarkAssigner, + watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), @@ -993,8 +1045,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti * * @param sourceContext The source context to emit data to. * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets. - * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator. - * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator. + * @param watermarkStrategy Optional, a serialized WatermarkStrategy. * @param runtimeContext The task's runtime context. * * @return The instantiated fetcher @@ -1004,8 +1055,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti protected abstract AbstractFetcher createFetcher( SourceContext sourceContext, Map subscribedPartitionsToStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup kafkaMetricGroup, diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index f06a6c5dd32a13e9bbd6f58700e814b9383b3de8..6c0066af43f0f05dcb706fb9f09f8ed7a3052d3b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.connectors.kafka.internals; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; @@ -64,20 +64,30 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public abstract class AbstractFetcher { private static final int NO_TIMESTAMPS_WATERMARKS = 0; - private static final int PERIODIC_WATERMARKS = 1; - private static final int PUNCTUATED_WATERMARKS = 2; + private static final int WITH_WATERMARK_GENERATOR = 1; // ------------------------------------------------------------------------ /** The source context to emit records and watermarks to. */ protected final SourceContext sourceContext; + /** + * Wrapper around our SourceContext for allowing the {@link org.apache.flink.api.common.eventtime.WatermarkGenerator} + * to emit watermarks and mark idleness. + */ + protected final WatermarkOutput watermarkOutput; + + /** + * {@link WatermarkOutputMultiplexer} for supporting per-partition watermark generation. + */ + private final WatermarkOutputMultiplexer watermarkOutputMultiplexer; + /** The lock that guarantees that record emission and state updates are atomic, * from the view of taking a checkpoint. */ private final Object checkpointLock; /** All partitions (and their state) that this fetcher is subscribed to. */ - private final List> subscribedPartitionStates; + private final List> subscribedPartitionStates; /** * Queue of partitions that are not yet assigned to any Kafka clients for consuming. @@ -88,31 +98,21 @@ public abstract class AbstractFetcher { *

All partitions added to this queue are guaranteed to have been added * to {@link #subscribedPartitionStates} already. */ - protected final ClosableBlockingQueue> unassignedPartitionsQueue; + protected final ClosableBlockingQueue> unassignedPartitionsQueue; /** The mode describing whether the fetcher also generates timestamps and watermarks. */ private final int timestampWatermarkMode; /** - * Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies. - */ - private final SerializedValue> watermarksPeriodic; - - /** - * Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies. + * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition + * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize + * it into multiple copies. */ - private final SerializedValue> watermarksPunctuated; + private final SerializedValue> watermarkStrategy; /** User class loader used to deserialize watermark assigners. */ private final ClassLoader userCodeClassLoader; - /** Only relevant for punctuated watermarks: The current cross partition watermark. */ - private volatile long maxWatermarkSoFar = Long.MIN_VALUE; - // ------------------------------------------------------------------------ // Metrics // ------------------------------------------------------------------------ @@ -141,14 +141,15 @@ public abstract class AbstractFetcher { protected AbstractFetcher( SourceContext sourceContext, Map seedPartitionsWithInitialOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); + this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext); + this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput); this.checkpointLock = sourceContext.getCheckpointLock(); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); @@ -157,23 +158,12 @@ public abstract class AbstractFetcher { this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP); this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP); - // figure out what we watermark mode we will be using - this.watermarksPeriodic = watermarksPeriodic; - this.watermarksPunctuated = watermarksPunctuated; + this.watermarkStrategy = watermarkStrategy; - if (watermarksPeriodic == null) { - if (watermarksPunctuated == null) { - // simple case, no watermarks involved - timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; - } else { - timestampWatermarkMode = PUNCTUATED_WATERMARKS; - } + if (watermarkStrategy == null) { + timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; } else { - if (watermarksPunctuated == null) { - timestampWatermarkMode = PERIODIC_WATERMARKS; - } else { - throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); - } + timestampWatermarkMode = WITH_WATERMARK_GENERATOR; } this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); @@ -182,19 +172,18 @@ public abstract class AbstractFetcher { this.subscribedPartitionStates = createPartitionStateHolders( seedPartitionsWithInitialOffsets, timestampWatermarkMode, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, userCodeClassLoader); // check that all seed partition states have a defined offset - for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { + for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { if (!partitionState.isOffsetDefined()) { throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } } // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue - for (KafkaTopicPartitionState partition : subscribedPartitionStates) { + for (KafkaTopicPartitionState partition : subscribedPartitionStates) { unassignedPartitionsQueue.add(partition); } @@ -204,11 +193,10 @@ public abstract class AbstractFetcher { } // if we have periodic watermarks, kick off the interval scheduler - if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - @SuppressWarnings("unchecked") - PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( + if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) { + PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter<>( subscribedPartitionStates, - sourceContext, + watermarkOutputMultiplexer, processingTimeProvider, autoWatermarkInterval); @@ -230,19 +218,18 @@ public abstract class AbstractFetcher { * @param newPartitions discovered partitions to add */ public void addDiscoveredPartitions(List newPartitions) throws IOException, ClassNotFoundException { - List> newPartitionStates = createPartitionStateHolders( + List> newPartitionStates = createPartitionStateHolders( newPartitions, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET, timestampWatermarkMode, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, userCodeClassLoader); if (useMetrics) { registerOffsetMetrics(consumerMetricGroup, newPartitionStates); } - for (KafkaTopicPartitionState newPartitionState : newPartitionStates) { + for (KafkaTopicPartitionState newPartitionState : newPartitionStates) { // the ordering is crucial here; first register the state holder, then // push it to the partitions queue to be read subscribedPartitionStates.add(newPartitionState); @@ -259,7 +246,7 @@ public abstract class AbstractFetcher { * * @return All subscribed partitions. */ - protected final List> subscribedPartitionStates() { + protected final List> subscribedPartitionStates() { return subscribedPartitionStates; } @@ -333,7 +320,7 @@ public abstract class AbstractFetcher { assert Thread.holdsLock(checkpointLock); HashMap state = new HashMap<>(subscribedPartitionStates.size()); - for (KafkaTopicPartitionState partition : subscribedPartitionStates) { + for (KafkaTopicPartitionState partition : subscribedPartitionStates) { state.put(partition.getKafkaTopicPartition(), partition.getOffset()); } return state; @@ -352,7 +339,7 @@ public abstract class AbstractFetcher { */ protected void emitRecordsWithTimestamps( Queue records, - KafkaTopicPartitionState partitionState, + KafkaTopicPartitionState partitionState, long offset, long kafkaEventTimestamp) { // emit the records, using the checkpoint lock to guarantee @@ -360,84 +347,29 @@ public abstract class AbstractFetcher { synchronized (checkpointLock) { T record; while ((record = records.poll()) != null) { - // timestamps will be of the same size as records. - long timestamp = getTimestampForRecord(record, partitionState, kafkaEventTimestamp); - sourceContext.collectWithTimestamp(record, timestamp); - if (timestampWatermarkMode == PUNCTUATED_WATERMARKS) { - emitPunctuatedWatermark(record, timestamp, partitionState); - } - } - partitionState.setOffset(offset); - } - } - - private void emitPunctuatedWatermark( - T record, - long timestamp, - KafkaTopicPartitionState partitionState) { - final KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks) partitionState; - - Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); - - // if we also have a new per-partition watermark, check if that is also a - // new cross-partition watermark - if (newWatermark != null) { - updateMinPunctuatedWatermark(newWatermark); - } - } - - protected long getTimestampForRecord( - T record, - KafkaTopicPartitionState partitionState, - long kafkaEventTimestamp) { - if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { - return kafkaEventTimestamp; - } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - final KafkaTopicPartitionStateWithPeriodicWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPeriodicWatermarks) partitionState; - - // extract timestamp - this accesses/modifies the per-partition state inside the - // watermark generator instance, so we need to lock the access on the - // partition state. concurrent access can happen from the periodic emitter - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (withWatermarksState) { - return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - } - } else { - final KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks) partitionState; - - // only one thread ever works on accessing timestamps and watermarks - // from the punctuated extractor - return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - } - } + long timestamp; - /** - *Checks whether a new per-partition watermark is also a new cross-partition watermark. - */ - private void updateMinPunctuatedWatermark(Watermark nextWatermark) { - if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { - long newMin = Long.MAX_VALUE; - - for (KafkaTopicPartitionState state : subscribedPartitionStates) { - @SuppressWarnings("unchecked") - final KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks) state; - - newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); - } + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (partitionState) { + + // You would expect that we don't have to do this under lock. You would be wrong: + // A WatermarkStrategy can wrap an old-style combined + // timestamp extractor/watermark assigner, in which case the TimestampAssigner and + // WatermarkGenerator wrap one and the same object, where extracting the timestamp + // updates the internal state of the assigner. + timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); + } + sourceContext.collectWithTimestamp(record, timestamp); - // double-check locking pattern - if (newMin > maxWatermarkSoFar) { - synchronized (checkpointLock) { - if (newMin > maxWatermarkSoFar) { - maxWatermarkSoFar = newMin; - sourceContext.emitWatermark(new Watermark(newMin)); - } + // TODO: not sure it's a good idea to split it into two Synchronized blocks, but + // we have to move the onEvent after the collect call, otherwise the WM would + // be emitted before the record + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (partitionState) { + partitionState.onEvent(record, timestamp); } } + partitionState.setOffset(offset); } } @@ -449,16 +381,15 @@ public abstract class AbstractFetcher { * Utility method that takes the topic partitions and creates the topic partition state * holders, depending on the timestamp / watermark mode. */ - private List> createPartitionStateHolders( + private List> createPartitionStateHolders( Map partitionsToInitialOffsets, int timestampWatermarkMode, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { // CopyOnWrite as adding discovered partitions could happen in parallel // while different threads iterate the partitions list - List> partitionStates = new CopyOnWriteArrayList<>(); + List> partitionStates = new CopyOnWriteArrayList<>(); switch (timestampWatermarkMode) { case NO_TIMESTAMPS_WATERMARKS: { @@ -466,7 +397,7 @@ public abstract class AbstractFetcher { // create the kafka version specific partition handle KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey()); - KafkaTopicPartitionState partitionState = + KafkaTopicPartitionState partitionState = new KafkaTopicPartitionState<>(partitionEntry.getKey(), kafkaHandle); partitionState.setOffset(partitionEntry.getValue()); @@ -476,18 +407,26 @@ public abstract class AbstractFetcher { return partitionStates; } - case PERIODIC_WATERMARKS: { + case WITH_WATERMARK_GENERATOR: { for (Map.Entry partitionEntry : partitionsToInitialOffsets.entrySet()) { KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey()); + WatermarkStrategy deserializedWatermarkStrategy = watermarkStrategy.deserializeValue( + userCodeClassLoader); - AssignerWithPeriodicWatermarks assignerInstance = - watermarksPeriodic.deserializeValue(userCodeClassLoader); + int outputId = watermarkOutputMultiplexer.registerNewOutput(); + WatermarkOutput immediateOutput = + watermarkOutputMultiplexer.getImmediateOutput(outputId); + WatermarkOutput deferredOutput = + watermarkOutputMultiplexer.getDeferredOutput(outputId); - KafkaTopicPartitionStateWithPeriodicWatermarks partitionState = - new KafkaTopicPartitionStateWithPeriodicWatermarks<>( + KafkaTopicPartitionStateWithWatermarkGenerator partitionState = + new KafkaTopicPartitionStateWithWatermarkGenerator<>( partitionEntry.getKey(), kafkaHandle, - assignerInstance); + deserializedWatermarkStrategy.createTimestampAssigner(() -> consumerMetricGroup), + deserializedWatermarkStrategy.createWatermarkGenerator(() -> consumerMetricGroup), + immediateOutput, + deferredOutput); partitionState.setOffset(partitionEntry.getValue()); @@ -497,26 +436,6 @@ public abstract class AbstractFetcher { return partitionStates; } - case PUNCTUATED_WATERMARKS: { - for (Map.Entry partitionEntry : partitionsToInitialOffsets.entrySet()) { - KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey()); - - AssignerWithPunctuatedWatermarks assignerInstance = - watermarksPunctuated.deserializeValue(userCodeClassLoader); - - KafkaTopicPartitionStateWithPunctuatedWatermarks partitionState = - new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( - partitionEntry.getKey(), - kafkaHandle, - assignerInstance); - - partitionState.setOffset(partitionEntry.getValue()); - - partitionStates.add(partitionState); - } - - return partitionStates; - } default: // cannot happen, add this as a guard for the future throw new RuntimeException(); @@ -524,15 +443,14 @@ public abstract class AbstractFetcher { } /** - * Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, SerializedValue, ClassLoader)} + * Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, ClassLoader)} * that uses the same offset for all partitions when creating their state holders. */ - private List> createPartitionStateHolders( + private List> createPartitionStateHolders( List partitions, long initialOffset, int timestampWatermarkMode, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { Map partitionsToInitialOffset = new HashMap<>(partitions.size()); @@ -543,8 +461,7 @@ public abstract class AbstractFetcher { return createPartitionStateHolders( partitionsToInitialOffset, timestampWatermarkMode, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, userCodeClassLoader); } @@ -562,9 +479,9 @@ public abstract class AbstractFetcher { */ private void registerOffsetMetrics( MetricGroup consumerMetricGroup, - List> partitionOffsetStates) { + List> partitionOffsetStates) { - for (KafkaTopicPartitionState ktp : partitionOffsetStates) { + for (KafkaTopicPartitionState ktp : partitionOffsetStates) { MetricGroup topicPartitionGroup = consumerMetricGroup .addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, ktp.getTopic()) .addGroup(OFFSETS_BY_PARTITION_METRICS_GROUP, Integer.toString(ktp.getPartition())); @@ -577,7 +494,7 @@ public abstract class AbstractFetcher { } } - private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState ktp) { + private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState ktp) { return ktp.getTopic() + "-" + ktp.getPartition(); } @@ -594,10 +511,10 @@ public abstract class AbstractFetcher { */ private static class OffsetGauge implements Gauge { - private final KafkaTopicPartitionState ktp; + private final KafkaTopicPartitionState ktp; private final OffsetGaugeType gaugeType; - OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) { + OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) { this.ktp = ktp; this.gaugeType = gaugeType; } @@ -620,30 +537,27 @@ public abstract class AbstractFetcher { * The periodic watermark emitter. In its given interval, it checks all partitions for * the current event time watermark, and possibly emits the next watermark. */ - private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { + private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { - private final List> allPartitions; + private final List> allPartitions; - private final SourceContext emitter; + private final WatermarkOutputMultiplexer watermarkOutputMultiplexer; private final ProcessingTimeService timerService; private final long interval; - private long lastWatermarkTimestamp; - //------------------------------------------------- PeriodicWatermarkEmitter( - List> allPartitions, - SourceContext emitter, + List> allPartitions, + WatermarkOutputMultiplexer watermarkOutputMultiplexer, ProcessingTimeService timerService, long autoWatermarkInterval) { this.allPartitions = checkNotNull(allPartitions); - this.emitter = checkNotNull(emitter); + this.watermarkOutputMultiplexer = watermarkOutputMultiplexer; this.timerService = checkNotNull(timerService); this.interval = autoWatermarkInterval; - this.lastWatermarkTimestamp = Long.MIN_VALUE; } //------------------------------------------------- @@ -653,29 +567,20 @@ public abstract class AbstractFetcher { } @Override - public void onProcessingTime(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) { - long minAcrossAll = Long.MAX_VALUE; - boolean isEffectiveMinAggregation = false; - for (KafkaTopicPartitionState state : allPartitions) { + for (KafkaTopicPartitionState state : allPartitions) { - // we access the current watermark for the periodic assigners under the state - // lock, to prevent concurrent modification to any internal variables - final long curr; + // we do this under the state lock, to prevent concurrent modification to any + // internal variables //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (state) { - curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks) state).getCurrentWatermarkTimestamp(); + state.onPeriodicEmit(); } - minAcrossAll = Math.min(minAcrossAll, curr); - isEffectiveMinAggregation = true; } - // emit next watermark, if there is one - if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) { - lastWatermarkTimestamp = minAcrossAll; - emitter.emitWatermark(new Watermark(minAcrossAll)); - } + watermarkOutputMultiplexer.onPeriodicEmit(); // schedule the next watermark timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java index 983d99c5300638759fa38d59ffba31a87f02e761..e8a859d43f73b7c32bf86381b422f23b77487f84 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -30,7 +30,7 @@ import org.apache.flink.annotation.Internal; * @param The type of the Kafka partition descriptor, which varies across Kafka versions. */ @Internal -public class KafkaTopicPartitionState { +public class KafkaTopicPartitionState { // ------------------------------------------------------------------------ @@ -106,6 +106,19 @@ public class KafkaTopicPartitionState { return committedOffset; } + public long extractTimestamp(T record, long kafkaEventTimestamp) { + return kafkaEventTimestamp; + } + + public void onEvent(T event, long timestamp) { + // do nothing + } + + public void onPeriodicEmit() { + // do nothing + } + + // ------------------------------------------------------------------------ @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java deleted file mode 100644 index 015ac71e95684c358c286530b33566327cc660b8..0000000000000000000000000000000000000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; - -/** - * A special version of the per-kafka-partition-state that additionally holds - * a periodic watermark generator (and timestamp extractor) per partition. - * - * @param The type of records handled by the watermark generator - * @param The type of the Kafka partition descriptor, which varies across Kafka versions. - */ -@Internal -public final class KafkaTopicPartitionStateWithPeriodicWatermarks extends KafkaTopicPartitionState { - - /** The timestamp assigner and watermark generator for the partition. */ - private final AssignerWithPeriodicWatermarks timestampsAndWatermarks; - - /** The last watermark timestamp generated by this partition. */ - private long partitionWatermark; - - // ------------------------------------------------------------------------ - - public KafkaTopicPartitionStateWithPeriodicWatermarks( - KafkaTopicPartition partition, KPH kafkaPartitionHandle, - AssignerWithPeriodicWatermarks timestampsAndWatermarks) { - super(partition, kafkaPartitionHandle); - - this.timestampsAndWatermarks = timestampsAndWatermarks; - this.partitionWatermark = Long.MIN_VALUE; - } - - // ------------------------------------------------------------------------ - - public long getTimestampForRecord(T record, long kafkaEventTimestamp) { - return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); - } - - public long getCurrentWatermarkTimestamp() { - Watermark wm = timestampsAndWatermarks.getCurrentWatermark(); - if (wm != null) { - partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp()); - } - return partitionWatermark; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() - + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; - } -} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java deleted file mode 100644 index e4fe10d9e36fd1921bc1a1001bb203a5e8477245..0000000000000000000000000000000000000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; - -import javax.annotation.Nullable; - -/** - * A special version of the per-kafka-partition-state that additionally holds - * a punctuated watermark generator (and timestamp extractor) per partition. - * - *

This class is not thread safe, but it gives volatile access to the current - * partition watermark ({@link #getCurrentPartitionWatermark()}). - * - * @param The type of records handled by the watermark generator - * @param The type of the Kafka partition descriptor, which varies across Kafka versions - */ -@Internal -public final class KafkaTopicPartitionStateWithPunctuatedWatermarks extends KafkaTopicPartitionState { - - /** The timestamp assigner and watermark generator for the partition. */ - private final AssignerWithPunctuatedWatermarks timestampsAndWatermarks; - - /** The last watermark timestamp generated by this partition. */ - private volatile long partitionWatermark; - - // ------------------------------------------------------------------------ - - public KafkaTopicPartitionStateWithPunctuatedWatermarks( - KafkaTopicPartition partition, KPH kafkaPartitionHandle, - AssignerWithPunctuatedWatermarks timestampsAndWatermarks) { - super(partition, kafkaPartitionHandle); - - this.timestampsAndWatermarks = timestampsAndWatermarks; - this.partitionWatermark = Long.MIN_VALUE; - } - - // ------------------------------------------------------------------------ - - public long getTimestampForRecord(T record, long kafkaEventTimestamp) { - return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); - } - - @Nullable - public Watermark checkAndGetNewWatermark(T record, long timestamp) { - Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp); - if (mark != null && mark.getTimestamp() > partitionWatermark) { - partitionWatermark = mark.getTimestamp(); - return mark; - } - else { - return null; - } - } - - public long getCurrentPartitionWatermark() { - return partitionWatermark; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition() - + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; - } -} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..97e46c01b26dfd37d65d7e81b3ed104ec811386b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; + +/** + * A special version of the per-kafka-partition-state that additionally holds a {@link + * TimestampAssigner}, {@link WatermarkGenerator}, an immediate {@link WatermarkOutput}, and a + * deferred {@link WatermarkOutput} for this partition. + * + *

See {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for an + * explanation + * of immediate and deferred {@link WatermarkOutput WatermarkOutputs.}. + * + * @param The type of records handled by the watermark generator + * @param The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +@Internal +public final class KafkaTopicPartitionStateWithWatermarkGenerator extends KafkaTopicPartitionState { + + private final TimestampAssigner timestampAssigner; + + private final WatermarkGenerator watermarkGenerator; + + /** + * Refer to {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for + * a description of immediate/deferred output. + */ + private final WatermarkOutput immediateOutput; + + /** + * Refer to {@link org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer} for + * a description of immediate/deferred output. + */ + private final WatermarkOutput deferredOutput; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionStateWithWatermarkGenerator( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + TimestampAssigner timestampAssigner, + WatermarkGenerator watermarkGenerator, + WatermarkOutput immediateOutput, + WatermarkOutput deferredOutput) { + super(partition, kafkaPartitionHandle); + + this.timestampAssigner = timestampAssigner; + this.watermarkGenerator = watermarkGenerator; + this.immediateOutput = immediateOutput; + this.deferredOutput = deferredOutput; + } + + // ------------------------------------------------------------------------ + + @Override + public long extractTimestamp(T record, long kafkaEventTimestamp) { + return timestampAssigner.extractTimestamp(record, kafkaEventTimestamp); + } + + @Override + public void onEvent(T event, long timestamp) { + watermarkGenerator.onEvent(event, timestamp, immediateOutput); + } + + @Override + public void onPeriodicEmit() { + watermarkGenerator.onPeriodicEmit(deferredOutput); + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset(); + } +} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..c660a96059029dd50b24118cc4e2ab7df3c28fc9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +/** + * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link + * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. + */ +public class SourceContextWatermarkOutputAdapter implements WatermarkOutput { + private final SourceContext sourceContext; + + public SourceContextWatermarkOutputAdapter(SourceContext sourceContext) { + this.sourceContext = sourceContext; + } + + @Override + public void emitWatermark(Watermark watermark) { + sourceContext.emitWatermark( + new org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp())); + } + + @Override + public void markIdle() { + sourceContext.markAsTemporarilyIdle(); + } +} diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 4501590d1eba097c259d92c654edc5a95b3f547f..2dfe1af784ec9fd25350704eb8440ee5ca6cdc34 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -18,12 +18,11 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -395,8 +394,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { protected AbstractFetcher createFetcher( SourceContext sourceContext, Map thisSubtaskPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b63e950a425e557c28eaf0185462e12da5ba4bac..23de3cca7e84b7b1a4b7f65e007dc9b11005c7ed 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; @@ -990,8 +991,24 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { private volatile boolean isRunning = true; - protected TestingFetcher(SourceFunction.SourceContext sourceContext, Map seedPartitionsWithInitialOffsets, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { - super(sourceContext, seedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics); + protected TestingFetcher( + SourceFunction.SourceContext sourceContext, + Map seedPartitionsWithInitialOffsets, + SerializedValue> watermarkStrategy, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { + super( + sourceContext, + seedPartitionsWithInitialOffsets, + watermarkStrategy, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + consumerMetricGroup, + useMetrics); } @Override @@ -1067,8 +1084,15 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @SuppressWarnings("unchecked") - DummyFlinkKafkaConsumer(SupplierWithException, Exception> abstractFetcherSupplier, AbstractPartitionDiscoverer abstractPartitionDiscoverer, long discoveryIntervalMillis) { - this(abstractFetcherSupplier, abstractPartitionDiscoverer, false, discoveryIntervalMillis); + DummyFlinkKafkaConsumer( + SupplierWithException, Exception> abstractFetcherSupplier, + AbstractPartitionDiscoverer abstractPartitionDiscoverer, + long discoveryIntervalMillis) { + this( + abstractFetcherSupplier, + abstractPartitionDiscoverer, + false, + discoveryIntervalMillis); } @SuppressWarnings("unchecked") @@ -1134,12 +1158,10 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @Override - @SuppressWarnings("unchecked") protected AbstractFetcher createFetcher( SourceContext sourceContext, Map thisSubtaskPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, @@ -1184,9 +1206,23 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @Override - protected AbstractFetcher createFetcher(SourceContext sourceContext, Map thisSubtaskPartitionsWithStartOffsets, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { - return new TestingFetcher(sourceContext, thisSubtaskPartitionsWithStartOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), 0L, getClass().getClassLoader(), consumerMetricGroup, useMetrics); - + protected AbstractFetcher createFetcher( + SourceContext sourceContext, + Map thisSubtaskPartitionsWithStartOffsets, + SerializedValue> watermarkStrategy, + StreamingRuntimeContext runtimeContext, + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { + return new TestingFetcher( + sourceContext, + thisSubtaskPartitionsWithStartOffsets, + watermarkStrategy, + runtimeContext.getProcessingTimeService(), + 0L, + getClass().getClassLoader(), + consumerMetricGroup, + useMetrics); } @Override @@ -1282,8 +1318,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { super( new TestSourceContext<>(), new HashMap<>(), - null, - null, + null /* watermark strategy */, new TestProcessingTimeService(), 0, MockFetcher.class.getClassLoader(), diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index d668ab52e93aef0279cc1722dd34589ab56f1b70..2a90327cba81423fbb4b276e1e30d206f6c01f10 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -18,13 +18,11 @@ package org.apache.flink.streaming.connectors.kafka.internals; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -33,19 +31,15 @@ import org.apache.flink.util.SerializedValue; import org.junit.Test; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.ArrayDeque; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -65,12 +59,11 @@ public class AbstractFetcherTest { TestSourceContext sourceContext = new TestSourceContext<>(); TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - null, - null, - new TestProcessingTimeService(), - 0); + sourceContext, + originalPartitions, + null, /* watermark strategy */ + new TestProcessingTimeService(), + 0); synchronized (sourceContext.getCheckpointLock()) { HashMap currentState = fetcher.snapshotCurrentState(); @@ -103,14 +96,13 @@ public class AbstractFetcherTest { TestSourceContext sourceContext = new TestSourceContext<>(); TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - null, /* periodic watermark assigner */ - null, /* punctuated watermark assigner */ - new TestProcessingTimeService(), - 0); + sourceContext, + originalPartitions, + null, /* watermark strategy */ + new TestProcessingTimeService(), + 0); - final KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(0); + final KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(0); emitRecord(fetcher, 1L, partitionStateHolder, 1L); emitRecord(fetcher, 2L, partitionStateHolder, 2L); @@ -123,278 +115,6 @@ public class AbstractFetcherTest { assertEquals(3L, partitionStateHolder.getOffset()); // the offset in state still should have advanced } - @Test - public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { - final String testTopic = "test topic name"; - Map originalPartitions = new HashMap<>(); - originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - - TestSourceContext sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); - - TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - null, /* periodic watermark assigner */ - new SerializedValue>(new PunctuatedTestExtractor()), /* punctuated watermark assigner */ - processingTimeProvider, - 0); - - final KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(0); - - // elements generate a watermark if the timestamp is a multiple of three - emitRecord(fetcher, 1L, partitionStateHolder, 1L); - emitRecord(fetcher, 2L, partitionStateHolder, 2L); - emitRecord(fetcher, 3L, partitionStateHolder, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - assertTrue(sourceContext.hasWatermark()); - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - assertEquals(3L, partitionStateHolder.getOffset()); - - // emit no records - fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, -1L); - - // no elements or watermarks should have been collected - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - assertFalse(sourceContext.hasWatermark()); - // the offset in state still should have advanced - assertEquals(4L, partitionStateHolder.getOffset()); - } - - @Test - public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception { - final String testTopic = "test topic name"; - Map originalPartitions = new HashMap<>(); - originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - - TestSourceContext sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); - - TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - new SerializedValue>(new PeriodicTestExtractor()), /* periodic watermark assigner */ - null, /* punctuated watermark assigner */ - processingTimeProvider, - 10); - - final KafkaTopicPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(0); - - // elements generate a watermark if the timestamp is a multiple of three - emitRecord(fetcher, 1L, partitionStateHolder, 1L); - emitRecord(fetcher, 2L, partitionStateHolder, 2L); - emitRecord(fetcher, 3L, partitionStateHolder, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - assertEquals(3L, partitionStateHolder.getOffset()); - - // advance timer for watermark emitting - processingTimeProvider.setCurrentTime(10L); - assertTrue(sourceContext.hasWatermark()); - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // emit no records - fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, Long.MIN_VALUE); - - // no elements should have been collected - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - // the offset in state still should have advanced - assertEquals(4L, partitionStateHolder.getOffset()); - - // no watermarks should be collected - processingTimeProvider.setCurrentTime(20L); - assertFalse(sourceContext.hasWatermark()); - } - - // ------------------------------------------------------------------------ - // Timestamps & watermarks tests - // ------------------------------------------------------------------------ - - @Test - public void testPunctuatedWatermarks() throws Exception { - final String testTopic = "test topic name"; - Map originalPartitions = new HashMap<>(); - originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - - TestSourceContext sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); - - TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - null, /* periodic watermark assigner */ - new SerializedValue>(new PunctuatedTestExtractor()), - processingTimeProvider, - 0); - - final KafkaTopicPartitionState part1 = fetcher.subscribedPartitionStates().get(0); - final KafkaTopicPartitionState part2 = fetcher.subscribedPartitionStates().get(1); - final KafkaTopicPartitionState part3 = fetcher.subscribedPartitionStates().get(2); - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - emitRecords(fetcher, Arrays.asList(1L, 2L), part1, 1L); - emitRecord(fetcher, 2L, part1, 2L); - emitRecords(fetcher, Arrays.asList(2L, 3L), part1, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - assertFalse(sourceContext.hasWatermark()); - - // elements for partition 2 - emitRecord(fetcher, 12L, part2, 1L); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); - assertFalse(sourceContext.hasWatermark()); - - // elements for partition 3 - emitRecord(fetcher, 101L, part3, 1L); - emitRecord(fetcher, 102L, part3, 2L); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - // now, we should have a watermark - assertTrue(sourceContext.hasWatermark()); - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - emitRecord(fetcher, 1003L, part3, 3L); - emitRecord(fetcher, 1004L, part3, 4L); - emitRecord(fetcher, 1005L, part3, 5L); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - emitRecord(fetcher, 30L, part1, 4L); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - assertTrue(sourceContext.hasWatermark()); - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - emitRecord(fetcher, 13L, part2, 2L); - assertFalse(sourceContext.hasWatermark()); - emitRecord(fetcher, 14L, part2, 3L); - assertFalse(sourceContext.hasWatermark()); - emitRecord(fetcher, 15L, part2, 3L); - assertTrue(sourceContext.hasWatermark()); - assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); - } - - @Test - public void testPeriodicWatermarks() throws Exception { - final String testTopic = "test topic name"; - Map originalPartitions = new HashMap<>(); - originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); - - TestSourceContext sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - - TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - new SerializedValue>(new PeriodicTestExtractor()), - null, /* punctuated watermarks assigner*/ - processingTimeService, - 10); - - final KafkaTopicPartitionState part1 = fetcher.subscribedPartitionStates().get(0); - final KafkaTopicPartitionState part2 = fetcher.subscribedPartitionStates().get(1); - final KafkaTopicPartitionState part3 = fetcher.subscribedPartitionStates().get(2); - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - emitRecord(fetcher, 1L, part1, 1L); - emitRecord(fetcher, 2L, part1, 2L); - emitRecord(fetcher, 3L, part1, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 2 - emitRecord(fetcher, 12L, part2, 1L); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 3 - emitRecord(fetcher, 101L, part3, 1L); - emitRecord(fetcher, 102L, part3, 2L); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - processingTimeService.setCurrentTime(10); - - // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - emitRecord(fetcher, 1003L, part3, 3L); - emitRecord(fetcher, 1004L, part3, 4L); - emitRecord(fetcher, 1005L, part3, 5L); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - emitRecord(fetcher, 30L, part1, 4L); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - - processingTimeService.setCurrentTime(20); - - // this blocks until the periodic thread emitted the watermark - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - emitRecord(fetcher, 13L, part2, 2L); - emitRecord(fetcher, 14L, part2, 3L); - emitRecord(fetcher, 15L, part2, 3L); - - processingTimeService.setCurrentTime(30); - // this blocks until the periodic thread emitted the watermark - long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); - assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); - } - - @Test - public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception { - final String testTopic = "test topic name"; - Map originalPartitions = new HashMap<>(); - - TestSourceContext sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); - - TestFetcher fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - new SerializedValue>(new PeriodicTestExtractor()), - null, /* punctuated watermarks assigner*/ - processingTimeProvider, - 10); - - processingTimeProvider.setCurrentTime(10); - // no partitions; when the periodic watermark emitter fires, no watermark should be emitted - assertFalse(sourceContext.hasWatermark()); - - // counter-test that when the fetcher does actually have partitions, - // when the periodic watermark emitter fires again, a watermark really is emitted - fetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition(testTopic, 0))); - emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(0), 3L); - processingTimeProvider.setCurrentTime(20); - assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); - } - @Test public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { // test data @@ -402,7 +122,6 @@ public class AbstractFetcherTest { // ----- create the test fetcher ----- - @SuppressWarnings("unchecked") SourceContext sourceContext = new TestSourceContext<>(); Map partitionsWithInitialOffsets = Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); @@ -413,8 +132,7 @@ public class AbstractFetcherTest { final TestFetcher fetcher = new TestFetcher<>( sourceContext, partitionsWithInitialOffsets, - null, /* periodic assigner */ - null, /* punctuated assigner */ + null, /* watermark strategy */ new TestProcessingTimeService(), 10, fetchLoopWaitLatch, @@ -451,16 +169,14 @@ public class AbstractFetcherTest { TestFetcher( SourceContext sourceContext, Map assignedPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval) throws Exception { this( sourceContext, assignedPartitionsWithStartOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, processingTimeProvider, autoWatermarkInterval, null, @@ -470,8 +186,7 @@ public class AbstractFetcherTest { TestFetcher( SourceContext sourceContext, Map assignedPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, OneShotLatch fetchLoopWaitLatch, @@ -480,8 +195,7 @@ public class AbstractFetcherTest { super( sourceContext, assignedPartitionsWithStartOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), @@ -499,7 +213,7 @@ public class AbstractFetcherTest { @Override public void runFetchLoop() throws Exception { if (fetchLoopWaitLatch != null) { - for (KafkaTopicPartitionState ignored : subscribedPartitionStates()) { + for (KafkaTopicPartitionState ignored : subscribedPartitionStates()) { fetchLoopWaitLatch.trigger(); stateIterationBlockLatch.await(); } @@ -536,7 +250,7 @@ public class AbstractFetcherTest { private static void emitRecord( AbstractFetcher fetcher, T record, - KafkaTopicPartitionState partitionState, + KafkaTopicPartitionState partitionState, long offset) { ArrayDeque recordQueue = new ArrayDeque<>(); recordQueue.add(record); @@ -548,55 +262,7 @@ public class AbstractFetcherTest { Long.MIN_VALUE); } - private static void emitRecords( - AbstractFetcher fetcher, - List records, - KafkaTopicPartitionState partitionState, - long offset) { - ArrayDeque recordQueue = new ArrayDeque<>(records); - - fetcher.emitRecordsWithTimestamps( - recordQueue, - partitionState, - offset, - Long.MIN_VALUE); - } - private static Queue emptyQueue() { return new ArrayDeque<>(); } - - @SuppressWarnings("deprecation") - private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks { - - private volatile long maxTimestamp = Long.MIN_VALUE; - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - maxTimestamp = Math.max(maxTimestamp, element); - return element; - } - - @Nullable - @Override - public Watermark getCurrentWatermark() { - return new Watermark(maxTimestamp); - } - } - - @SuppressWarnings("deprecation") - private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks { - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - return element; - } - - @Nullable - @Override - public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { - return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; - } - - } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e33ea4a31026c707cfd89ef2ba5b5b11b6ddde06 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategies; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the watermarking behaviour of {@link AbstractFetcher}. + */ +@SuppressWarnings("serial") +@RunWith(Enclosed.class) +public class AbstractFetcherWatermarksTest { + + /** + * Tests with watermark generators that have a periodic nature. + */ + @RunWith(Parameterized.class) + public static class PeriodicWatermarksSuite { + + @Parameterized.Parameters + public static Collection> getParams() { + return Arrays.asList( + new AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()), + WatermarkStrategies + .forGenerator((ctx) -> new PeriodicTestWatermarkGenerator()) + .withTimestampAssigner((event, previousTimestamp) -> event) + .build() + ); + } + + @Parameterized.Parameter + public WatermarkStrategy testWmStrategy; + + @Test + public void testPeriodicWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 7), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 13), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 21), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<>(testWmStrategy), + processingTimeService, + 10); + + final KafkaTopicPartitionState part1 = + fetcher.subscribedPartitionStates().get(0); + final KafkaTopicPartitionState part2 = + fetcher.subscribedPartitionStates().get(1); + final KafkaTopicPartitionState part3 = + fetcher.subscribedPartitionStates().get(2); + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + emitRecord(fetcher, 1L, part1, 1L); + emitRecord(fetcher, 1L, part1, 1L); + emitRecord(fetcher, 2L, part1, 2L); + emitRecord(fetcher, 3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 2 + emitRecord(fetcher, 12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 3 + emitRecord(fetcher, 101L, part3, 1L); + emitRecord(fetcher, 102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(10); + + // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + emitRecord(fetcher, 1003L, part3, 3L); + emitRecord(fetcher, 1004L, part3, 4L); + emitRecord(fetcher, 1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + emitRecord(fetcher, 30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(20); + + // this blocks until the periodic thread emitted the watermark + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + emitRecord(fetcher, 13L, part2, 2L); + emitRecord(fetcher, 14L, part2, 3L); + emitRecord(fetcher, 15L, part2, 3L); + + processingTimeService.setCurrentTime(30); + // this blocks until the periodic thread emitted the watermark + long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); + assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); + } + + @Test + public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 1), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<>(testWmStrategy), + processingTimeProvider, + 10); + + final KafkaTopicPartitionState partitionStateHolder = + fetcher.subscribedPartitionStates().get(0); + + // elements generate a watermark if the timestamp is a multiple of three + emitRecord(fetcher, 1L, partitionStateHolder, 1L); + emitRecord(fetcher, 2L, partitionStateHolder, 2L); + emitRecord(fetcher, 3L, partitionStateHolder, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertEquals(3L, partitionStateHolder.getOffset()); + + // advance timer for watermark emitting + processingTimeProvider.setCurrentTime(10L); + assertTrue(sourceContext.hasWatermark()); + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // emit no records + fetcher.emitRecordsWithTimestamps( + emptyQueue(), + partitionStateHolder, + 4L, + Long.MIN_VALUE); + + // no elements should have been collected + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + // the offset in state still should have advanced + assertEquals(4L, partitionStateHolder.getOffset()); + + // no watermarks should be collected + processingTimeProvider.setCurrentTime(20L); + assertFalse(sourceContext.hasWatermark()); + } + + @Test + public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<>(testWmStrategy), + processingTimeProvider, + 10); + + processingTimeProvider.setCurrentTime(10); + // no partitions; when the periodic watermark emitter fires, no watermark should be emitted + assertFalse(sourceContext.hasWatermark()); + + // counter-test that when the fetcher does actually have partitions, + // when the periodic watermark emitter fires again, a watermark really is emitted + fetcher.addDiscoveredPartitions(Collections.singletonList( + new KafkaTopicPartition(testTopic, 0))); + emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(0), 3L); + processingTimeProvider.setCurrentTime(20); + assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); + } + } + + /** + * Tests with watermark generators that have a punctuated nature. + */ + public static class PunctuatedWatermarksSuite { + + @Test + public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 1), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + AssignerWithPunctuatedWatermarksAdapter.Strategy testWmStrategy = + new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(new PunctuatedTestExtractor()); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<>(testWmStrategy), + processingTimeProvider, + 0); + + final KafkaTopicPartitionState partitionStateHolder = + fetcher.subscribedPartitionStates().get(0); + + // elements generate a watermark if the timestamp is a multiple of three + emitRecord(fetcher, 1L, partitionStateHolder, 1L); + emitRecord(fetcher, 2L, partitionStateHolder, 2L); + emitRecord(fetcher, 3L, partitionStateHolder, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertTrue(sourceContext.hasWatermark()); + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + assertEquals(3L, partitionStateHolder.getOffset()); + + // emit no records + fetcher.emitRecordsWithTimestamps(emptyQueue(), partitionStateHolder, 4L, -1L); + + // no elements or watermarks should have been collected + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + // the offset in state still should have advanced + assertEquals(4L, partitionStateHolder.getOffset()); + } + + @Test + public void testPunctuatedWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 7), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 13), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put( + new KafkaTopicPartition(testTopic, 21), + KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + AssignerWithPunctuatedWatermarksAdapter.Strategy testWmStrategy = + new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(new PunctuatedTestExtractor()); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<>(testWmStrategy), + processingTimeProvider, + 0); + + final KafkaTopicPartitionState part1 = + fetcher.subscribedPartitionStates().get(0); + final KafkaTopicPartitionState part2 = + fetcher.subscribedPartitionStates().get(1); + final KafkaTopicPartitionState part3 = + fetcher.subscribedPartitionStates().get(2); + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + emitRecords(fetcher, Arrays.asList(1L, 2L), part1, 1L); + emitRecord(fetcher, 2L, part1, 2L); + emitRecords(fetcher, Arrays.asList(2L, 3L), part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 2 + emitRecord(fetcher, 12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 3 + emitRecord(fetcher, 101L, part3, 1L); + emitRecord(fetcher, 102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + // now, we should have a watermark + assertTrue(sourceContext.hasWatermark()); + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + emitRecord(fetcher, 1003L, part3, 3L); + emitRecord(fetcher, 1004L, part3, 4L); + emitRecord(fetcher, 1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + emitRecord(fetcher, 30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + assertTrue(sourceContext.hasWatermark()); + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + emitRecord(fetcher, 13L, part2, 2L); + assertFalse(sourceContext.hasWatermark()); + emitRecord(fetcher, 14L, part2, 3L); + assertFalse(sourceContext.hasWatermark()); + emitRecord(fetcher, 15L, part2, 3L); + assertTrue(sourceContext.hasWatermark()); + assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); + } + } + + private static final class TestFetcher extends AbstractFetcher { + TestFetcher( + SourceContext sourceContext, + Map assignedPartitionsWithStartOffsets, + SerializedValue> watermarkStrategy, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval) throws Exception { + super( + sourceContext, + assignedPartitionsWithStartOffsets, + watermarkStrategy, + processingTimeProvider, + autoWatermarkInterval, + TestFetcher.class.getClassLoader(), + new UnregisteredMetricsGroup(), + false); + } + + public void runFetchLoop() { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doCommitInternalOffsetsToKafka( + Map offsets, + @Nonnull KafkaCommitCallback commitCallback) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new Object(); + } + } + + private static void emitRecord( + AbstractFetcher fetcher, + T record, + KafkaTopicPartitionState partitionState, + long offset) { + ArrayDeque recordQueue = new ArrayDeque<>(); + recordQueue.add(record); + + fetcher.emitRecordsWithTimestamps( + recordQueue, + partitionState, + offset, + Long.MIN_VALUE); + } + + private static void emitRecords( + AbstractFetcher fetcher, + List records, + KafkaTopicPartitionState partitionState, + long offset) { + ArrayDeque recordQueue = new ArrayDeque<>(records); + + fetcher.emitRecordsWithTimestamps( + recordQueue, + partitionState, + offset, + Long.MIN_VALUE); + } + + private static Queue emptyQueue() { + return new ArrayDeque<>(); + } + + @SuppressWarnings("deprecation") + private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks { + + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + maxTimestamp = Math.max(maxTimestamp, element); + return element; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(maxTimestamp); + } + } + + @SuppressWarnings("deprecation") + private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; + } + } + + private static class PeriodicTestWatermarkGenerator implements WatermarkGenerator { + + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public void onEvent( + Long event, long eventTimestamp, WatermarkOutput output) { + maxTimestamp = Math.max(maxTimestamp, event); + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(maxTimestamp)); + } + } +} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index dc58594f6e4f9be7666ed409da05f0455c53f712..4b547a8d0ae8c9ea9be08d33f92f037df2ef50e0 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -18,10 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; import org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher; @@ -211,8 +210,7 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { protected AbstractFetcher createFetcher( SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, @@ -225,8 +223,7 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { return new KafkaFetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index 2b44bb96064da86709addc1352629f3340482b76..51bec0d5bdd3c60d818f029d903ecb7176ec4bbe 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * to the KafkaConsumer calls that change signature. */ @Internal -public class KafkaConsumerThread extends Thread { +public class KafkaConsumerThread extends Thread { /** Logger for this consumer. */ private final Logger log; @@ -79,7 +79,7 @@ public class KafkaConsumerThread extends Thread { private final Properties kafkaProperties; /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ - private final ClosableBlockingQueue> unassignedPartitionsQueue; + private final ClosableBlockingQueue> unassignedPartitionsQueue; /** The maximum number of milliseconds to wait for a fetch batch. */ private final long pollTimeout; @@ -122,7 +122,7 @@ public class KafkaConsumerThread extends Thread { Logger log, Handover handover, Properties kafkaProperties, - ClosableBlockingQueue> unassignedPartitionsQueue, + ClosableBlockingQueue> unassignedPartitionsQueue, String threadName, long pollTimeout, boolean useMetrics, @@ -203,7 +203,7 @@ public class KafkaConsumerThread extends Thread { // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue - List> newPartitions; + List> newPartitions; // main fetch loop while (running) { @@ -370,7 +370,7 @@ public class KafkaConsumerThread extends Thread { *

This method is exposed for testing purposes. */ @VisibleForTesting - void reassignPartitions(List> newPartitions) throws Exception { + void reassignPartitions(List> newPartitions) throws Exception { if (newPartitions.size() == 0) { return; } @@ -412,7 +412,7 @@ public class KafkaConsumerThread extends Thread { // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. - for (KafkaTopicPartitionState newPartitionState : newPartitions) { + for (KafkaTopicPartitionState newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle())); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); @@ -451,7 +451,7 @@ public class KafkaConsumerThread extends Thread { hasBufferedWakeup = false; // re-add all new partitions back to the unassigned partitions queue to be picked up again - for (KafkaTopicPartitionState newPartition : newPartitions) { + for (KafkaTopicPartitionState newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } @@ -481,9 +481,9 @@ public class KafkaConsumerThread extends Thread { // Utilities // ------------------------------------------------------------------------ - private static List convertKafkaPartitions(List> partitions) { + private static List convertKafkaPartitions(List> partitions) { ArrayList result = new ArrayList<>(partitions.size()); - for (KafkaTopicPartitionState p : partitions) { + for (KafkaTopicPartitionState p : partitions) { result.add(p.getKafkaPartitionHandle()); } return result; diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java index 3215b5bb84ba1bf1c47be7877db70106c3319454..d591f584427c7463fd7eb7364a3b50896a397bf9 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java @@ -18,9 +18,8 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; @@ -78,8 +77,7 @@ public class KafkaFetcher extends AbstractFetcher { public KafkaFetcher( SourceFunction.SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, + SerializedValue> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, @@ -93,8 +91,7 @@ public class KafkaFetcher extends AbstractFetcher { super( sourceContext, assignedPartitionsWithInitialOffsets, - watermarksPeriodic, - watermarksPunctuated, + watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, @@ -136,7 +133,7 @@ public class KafkaFetcher extends AbstractFetcher { final ConsumerRecords records = handover.pollNext(); // get the records for each topic partition - for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { + for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { List> partitionRecords = records.records(partition.getKafkaPartitionHandle()); @@ -207,11 +204,11 @@ public class KafkaFetcher extends AbstractFetcher { @Nonnull KafkaCommitCallback commitCallback) throws Exception { @SuppressWarnings("unchecked") - List> partitions = subscribedPartitionStates(); + List> partitions = subscribedPartitionStates(); Map offsetsToCommit = new HashMap<>(partitions.size()); - for (KafkaTopicPartitionState partition : partitions) { + for (KafkaTopicPartitionState partition : partitions) { Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); if (lastProcessedOffset != null) { checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");