未验证 提交 a9ca80bb 编写于 作者: A Alexey Trenikhin 提交者: Aljoscha Krettek

[FLINK-8354] Refactor AUTO_COMMIT config in Kafka consumers

This moves the code to a method in the base class.
上级 6051f5cb
......@@ -34,7 +34,6 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
......@@ -190,9 +189,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
adjustAutoCommitConfig(properties, offsetCommitMode);
return new Kafka010Fetcher<>(
sourceContext,
......
......@@ -243,9 +243,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
adjustAutoCommitConfig(properties, offsetCommitMode);
return new Kafka09Fetcher<>(
sourceContext,
......
......@@ -54,6 +54,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -62,6 +63,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
......@@ -248,6 +250,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
this.useMetrics = useMetrics;
}
/**
* Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
* This overwrites whatever setting the user configured in the properties.
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
......
......@@ -221,9 +221,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
adjustAutoCommitConfig(properties, offsetCommitMode);
return new KafkaFetcher<>(
sourceContext,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册