[minor] Fix warnings in AbstractFetcher(Test)

上级 8e5685d3
......@@ -130,9 +130,11 @@ public abstract class AbstractFetcher<T, KPH> {
*/
private final MetricGroup consumerMetricGroup;
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private final MetricGroup legacyCurrentOffsetsMetricGroup;
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private final MetricGroup legacyCommittedOffsetsMetricGroup;
......@@ -185,7 +187,7 @@ public abstract class AbstractFetcher<T, KPH> {
userCodeClassLoader);
// check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
for (KafkaTopicPartitionState<?> partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
......@@ -204,7 +206,7 @@ public abstract class AbstractFetcher<T, KPH> {
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@SuppressWarnings("unchecked")
PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
PeriodicWatermarkEmitter<KPH> periodicEmitter = new PeriodicWatermarkEmitter(
subscribedPartitionStates,
sourceContext,
processingTimeProvider,
......@@ -303,7 +305,7 @@ public abstract class AbstractFetcher<T, KPH> {
return offsets.entrySet()
.stream()
.filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
......
......@@ -443,7 +443,7 @@ public class AbstractFetcherTest {
// ------------------------------------------------------------------------
private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty();
Map<KafkaTopicPartition, Long> lastCommittedOffsets = null;
private final OneShotLatch fetchLoopWaitLatch;
private final OneShotLatch stateIterationBlockLatch;
......@@ -499,7 +499,7 @@ public class AbstractFetcherTest {
@Override
public void runFetchLoop() throws Exception {
if (fetchLoopWaitLatch != null) {
for (KafkaTopicPartitionState ignored : subscribedPartitionStates()) {
for (KafkaTopicPartitionState<?> ignored : subscribedPartitionStates()) {
fetchLoopWaitLatch.trigger();
stateIterationBlockLatch.await();
}
......@@ -521,13 +521,13 @@ public class AbstractFetcherTest {
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback callback) throws Exception {
lastCommittedOffsets = Optional.of(offsets);
@Nonnull KafkaCommitCallback callback) {
lastCommittedOffsets = offsets;
callback.onSuccess();
}
public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() {
return lastCommittedOffsets;
return Optional.ofNullable(lastCommittedOffsets);
}
}
......@@ -537,7 +537,7 @@ public class AbstractFetcherTest {
AbstractFetcher<T, KPH> fetcher,
T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset) throws Exception {
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>();
recordQueue.add(record);
......@@ -552,7 +552,7 @@ public class AbstractFetcherTest {
AbstractFetcher<T, KPH> fetcher,
List<T> records,
KafkaTopicPartitionState<KPH> partitionState,
long offset) throws Exception {
long offset) {
ArrayDeque<T> recordQueue = new ArrayDeque<>(records);
fetcher.emitRecordsWithTimestamps(
......@@ -566,6 +566,7 @@ public class AbstractFetcherTest {
return new ArrayDeque<>();
}
@SuppressWarnings("deprecation")
private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
......@@ -583,6 +584,7 @@ public class AbstractFetcherTest {
}
}
@SuppressWarnings("deprecation")
private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册