diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index b446748c2842d297a3d0d3c172d10305ac275a09..f06a6c5dd32a13e9bbd6f58700e814b9383b3de8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -130,9 +130,11 @@ public abstract class AbstractFetcher { */ private final MetricGroup consumerMetricGroup; + @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated private final MetricGroup legacyCurrentOffsetsMetricGroup; + @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated private final MetricGroup legacyCommittedOffsetsMetricGroup; @@ -185,7 +187,7 @@ public abstract class AbstractFetcher { userCodeClassLoader); // check that all seed partition states have a defined offset - for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { + for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { if (!partitionState.isOffsetDefined()) { throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } @@ -204,7 +206,7 @@ public abstract class AbstractFetcher { // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @SuppressWarnings("unchecked") - PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( + PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( subscribedPartitionStates, sourceContext, processingTimeProvider, @@ -303,7 +305,7 @@ public abstract class AbstractFetcher { return offsets.entrySet() .stream() .filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue())) - .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index e662a062c528f48f37407208cec4808d2a06f021..d668ab52e93aef0279cc1722dd34589ab56f1b70 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -443,7 +443,7 @@ public class AbstractFetcherTest { // ------------------------------------------------------------------------ private static final class TestFetcher extends AbstractFetcher { - Optional> lastCommittedOffsets = Optional.empty(); + Map lastCommittedOffsets = null; private final OneShotLatch fetchLoopWaitLatch; private final OneShotLatch stateIterationBlockLatch; @@ -499,7 +499,7 @@ public class AbstractFetcherTest { @Override public void runFetchLoop() throws Exception { if (fetchLoopWaitLatch != null) { - for (KafkaTopicPartitionState ignored : subscribedPartitionStates()) { + for (KafkaTopicPartitionState ignored : subscribedPartitionStates()) { fetchLoopWaitLatch.trigger(); stateIterationBlockLatch.await(); } @@ -521,13 +521,13 @@ public class AbstractFetcherTest { @Override protected void doCommitInternalOffsetsToKafka( Map offsets, - @Nonnull KafkaCommitCallback callback) throws Exception { - lastCommittedOffsets = Optional.of(offsets); + @Nonnull KafkaCommitCallback callback) { + lastCommittedOffsets = offsets; callback.onSuccess(); } public Optional> getLastCommittedOffsets() { - return lastCommittedOffsets; + return Optional.ofNullable(lastCommittedOffsets); } } @@ -537,7 +537,7 @@ public class AbstractFetcherTest { AbstractFetcher fetcher, T record, KafkaTopicPartitionState partitionState, - long offset) throws Exception { + long offset) { ArrayDeque recordQueue = new ArrayDeque<>(); recordQueue.add(record); @@ -552,7 +552,7 @@ public class AbstractFetcherTest { AbstractFetcher fetcher, List records, KafkaTopicPartitionState partitionState, - long offset) throws Exception { + long offset) { ArrayDeque recordQueue = new ArrayDeque<>(records); fetcher.emitRecordsWithTimestamps( @@ -566,6 +566,7 @@ public class AbstractFetcherTest { return new ArrayDeque<>(); } + @SuppressWarnings("deprecation") private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks { private volatile long maxTimestamp = Long.MIN_VALUE; @@ -583,6 +584,7 @@ public class AbstractFetcherTest { } } + @SuppressWarnings("deprecation") private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks { @Override