From fa403a9998328a81ec6c3c785e842526f6abba0a Mon Sep 17 00:00:00 2001 From: Alexey Trenikhin Date: Wed, 20 Feb 2019 14:03:26 +0100 Subject: [PATCH] [FLINK-8354] Fix formatting in FlinkKafkaConsumerBase --- .../connectors/kafka/FlinkKafkaConsumerBase.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 196b5c08af3..bf8f29e7e27 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -497,7 +497,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti // restored partitions that should not be subscribed by this subtask if (KafkaTopicPartitionAssigner.assign( restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) - == getRuntimeContext().getIndexOfThisSubtask()){ + == getRuntimeContext().getIndexOfThisSubtask()){ subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); } } else { @@ -545,16 +545,16 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti } for (Map.Entry partitionToOffset - : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) { + : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) { subscribedPartitionsToStartOffsets.put( partitionToOffset.getKey(), (partitionToOffset.getValue() == null) - // if an offset cannot be retrieved for a partition with the given timestamp, - // we default to using the latest offset for the partition - ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET - // since the specified offsets represent the next record to read, we subtract - // it by one so that the initial state of the consumer will be correct - : partitionToOffset.getValue() - 1); + // if an offset cannot be retrieved for a partition with the given timestamp, + // we default to using the latest offset for the partition + ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + : partitionToOffset.getValue() - 1); } break; -- GitLab