提交 d20728ba 编写于 作者: P Piotr Nowojski 提交者: Aljoscha Krettek

[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery

上级 7a35c356
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
* Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement
* two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
* <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is
* expected to call:
* <ul>
* <li>{@link FlinkKafkaProducer#initTransactions()}</li>
* <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
* <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
* <li>{@link FlinkKafkaProducer#flush()}</li>
* <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
* </ul>
* <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing
* it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between
* {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume
* interrupted transaction and commit if after a restart:
* <ul>
* <li>{@link FlinkKafkaProducer#initTransactions()}</li>
* <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
* <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
* <li>{@link FlinkKafkaProducer#flush()}</li>
* <li>{@link FlinkKafkaProducer#getProducerId()}</li>
* <li>{@link FlinkKafkaProducer#getEpoch()}</li>
* <li>node failure... restore producerId and epoch from state</li>
* <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
* <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
* </ul>
* <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()}
* as a way to obtain the producerId and epoch counters. It has to be done, because otherwise
* {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions.
* <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer}
* is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on
* {@link FlinkKafkaProducer#commitTransaction()}.
* <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via
* {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
* private fields).
* <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's
* API authors to make them possible.
* <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements
* required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to
* re-implement whole Kafka's 0.11 REST API client on our own.
public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
private final KafkaProducer<K, V> kafkaProducer;
private final String transactionalId;
public FlinkKafkaProducer(Properties properties) {
transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
kafkaProducer = new KafkaProducer<>(properties);
// -------------------------------- Simple proxy method calls --------------------------------
public void initTransactions() {
public void beginTransaction() throws ProducerFencedException {
public void commitTransaction() throws ProducerFencedException {
public void abortTransaction() throws ProducerFencedException {
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return kafkaProducer.send(record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return kafkaProducer.send(record, callback);
public List<PartitionInfo> partitionsFor(String topic) {
return kafkaProducer.partitionsFor(topic);
public Map<MetricName, ? extends Metric> metrics() {
return kafkaProducer.metrics();
public void close() {
public void close(long timeout, TimeUnit unit) {
kafkaProducer.close(timeout, unit);
// -------------------------------- New methods or methods with changed behaviour --------------------------------
public void flush() {
if (transactionalId != null) {
public void resumeTransaction(long producerId, short epoch) {
Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
Object transactionManager = getValue(kafkaProducer, "transactionManager");
Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
invoke(sequenceNumbers, "clear");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
setValue(producerIdAndEpoch, "producerId", producerId);
setValue(producerIdAndEpoch, "epoch", epoch);
invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
setValue(transactionManager, "transactionStarted", true);
public String getTransactionalId() {
return transactionalId;
public long getProducerId() {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
return (long) getValue(producerIdAndEpoch, "producerId");
public short getEpoch() {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
return (short) getValue(producerIdAndEpoch, "epoch");
public int getTransactionCoordinatorId() {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
Node node = (Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION);
return node.id();
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
Object transactionManager = getValue(kafkaProducer, "transactionManager");
Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
Object sender = getValue(kafkaProducer, "sender");
invoke(sender, "wakeup");
private static Enum<?> getEnum(String enumFullName) {
String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
if (x.length == 2) {
String enumClassName = x[0];
String enumName = x[1];
try {
Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
return null;
private static Object invoke(Object object, String methodName, Object... args) {
Class<?>[] argTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
return invoke(object, methodName, argTypes, args);
private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
try {
Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
return method.invoke(object, args);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
private static Object getValue(Object object, String fieldName) {
return getValue(object, object.getClass(), fieldName);
private static Object getValue(Object object, Class<?> clazz, String fieldName) {
try {
Field field = clazz.getDeclaredField(fieldName);
return field.get(object);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
private static void setValue(Object object, String fieldName, Object value) {
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.set(object, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
* Tests for our own {@link FlinkKafkaProducer}.
public class FlinkKafkaProducerTests extends KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
public void before() {
transactionalId = UUID.randomUUID().toString();
extraProperties = new Properties();
extraProperties.put("transactional.id", transactionalId);
extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
extraProperties.put("isolation.level", "read_committed");
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
assertRecord(topicName, "42", "42");
@Test(timeout = 30000L)
public void testResumeTransaction() throws IOException {
String topicName = "flink-kafka-producer-resume-transaction";
try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
long producerId = kafkaProducer.getProducerId();
short epoch = kafkaProducer.getEpoch();
try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
resumeProducer.resumeTransaction(producerId, epoch);
assertRecord(topicName, "42", "42");
// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
// this shouldn't fail also, for same reason as above
try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
resumeProducer.resumeTransaction(producerId, epoch);
private void assertRecord(String topicName, String expectedKey, String expectedValue) {
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
assertEquals(expectedKey, record.key());
assertEquals(expectedValue, record.value());
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册