提交 6886f638 编写于 作者: A Aljoscha Krettek

[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010

上级 e7996b0d
......@@ -17,24 +17,13 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
......@@ -43,32 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
*
* <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
* and a custom operator (b).
*
* <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
* For (b), it extends the StreamTask class.
*
* <p>Details about approach (a):
* Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
* DataStream.addSink() method.
* Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
* the Kafka 0.10 producer has a second invocation option, approach (b).
*
* <p>Details about approach (b):
* Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
* FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
* can access the internal record timestamp of the record and write it to Kafka.
*
* <p>All methods and constructors in this class are marked with the approach they are needed for.
*/
public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
......@@ -87,7 +54,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
......@@ -105,7 +76,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined (keyless) serialization schema.
* @param producerConfig Properties with the producer configuration.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
SerializationSchema<T> serializationSchema,
......@@ -124,20 +99,24 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<T> customPartitioner) {
GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
}
// ---------------------- Regular constructors w/o timestamp support ------------------
// ---------------------- Regular constructors------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
......@@ -220,9 +199,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
// ----------------------------- Deprecated constructors / factory methods ---------------------------
......@@ -250,11 +227,10 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
Properties producerConfig,
KafkaPartitioner<T> customPartitioner) {
GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer =
new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
return new FlinkKafkaProducer010Configuration<T>(streamSink, inStream, kafkaProducer);
}
/**
......@@ -288,157 +264,75 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
// ----------------------------- Generic element processing ---------------------------
/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
*
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.writeTimestampToKafka = writeTimestampToKafka;
}
private void invokeInternal(T next, long elementTimestamp) throws Exception {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
// ----------------------------- Generic element processing ---------------------------
internalProducer.checkErroneous();
@Override
public void invoke(T value, Context context) throws Exception {
byte[] serializedKey = internalProducer.schema.serializeKey(next);
byte[] serializedValue = internalProducer.schema.serializeValue(next);
String targetTopic = internalProducer.schema.getTargetTopic(next);
checkErroneous();
byte[] serializedKey = schema.serializeKey(value);
byte[] serializedValue = schema.serializeValue(value);
String targetTopic = schema.getTargetTopic(value);
if (targetTopic == null) {
targetTopic = internalProducer.defaultTopicId;
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = elementTimestamp;
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
internalProducer.topicPartitionsMap.put(targetTopic, partitions);
partitions = getPartitionsByTopic(targetTopic, producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (internalProducer.flinkKafkaPartitioner == null) {
if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(value, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
}
if (internalProducer.flushOnCheckpoint) {
synchronized (internalProducer.pendingRecordsLock) {
internalProducer.pendingRecords++;
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
internalProducer.producer.send(record, internalProducer.callback);
}
// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
// ---- Configuration setters
/**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
* exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
*
* <p>Method is only accessible for approach (a) (see above)
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.setLogFailuresOnly(logFailuresOnly);
}
/**
* If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
* to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
*
* <p>Method is only accessible for approach (a) (see above)
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.setFlushOnCheckpoint(flush);
}
/**
* This method is used for approach (a) (see above).
*/
@Override
public void open(Configuration parameters) throws Exception {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.open(parameters);
}
/**
* This method is used for approach (a) (see above).
*/
@Override
public IterationRuntimeContext getIterationRuntimeContext() {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
return internalProducer.getIterationRuntimeContext();
}
/**
* This method is used for approach (a) (see above).
*/
@Override
public void setRuntimeContext(RuntimeContext t) {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.setRuntimeContext(t);
}
/**
* Invoke method for using the Sink as DataStream.addSink() sink.
*
* <p>This method is used for approach (a) (see above)
*
* @param value The input record.
*/
@Override
public void invoke(T value) throws Exception {
invokeInternal(value, Long.MAX_VALUE);
}
// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
/**
* Process method for using the sink with timestamp support.
*
* <p>This method is used for approach (b) (see above)
*/
@Override
public void processElement(StreamRecord<T> element) throws Exception {
invokeInternal(element.getValue(), element.getTimestamp());
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.initializeState(context);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
internalProducer.snapshotState(context);
producer.send(record, callback);
}
/**
* Configuration object returned by the writeToKafkaWithTimestamps() call.
*
* <p>This is only kept because it's part of the public API. It is not necessary anymore, now
* that the {@link SinkFunction} interface provides timestamps.
*/
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
private final FlinkKafkaProducerBase wrappedProducerBase;
private final FlinkKafkaProducer010 producer;
private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
private FlinkKafkaProducer010Configuration(
DataStreamSink originalSink,
DataStream<T> inputStream,
FlinkKafkaProducer010<T> producer) {
//noinspection unchecked
super(stream, producer);
super(inputStream, originalSink.getTransformation().getOperator());
this.producer = producer;
this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
}
/**
......@@ -450,7 +344,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
producer.setLogFailuresOnly(logFailuresOnly);
}
/**
......@@ -461,7 +355,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
this.wrappedProducerBase.setFlushOnCheckpoint(flush);
producer.setFlushOnCheckpoint(flush);
}
/**
......@@ -471,7 +365,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.producer.writeTimestampToKafka = writeTimestampToKafka;
producer.writeTimestampToKafka = writeTimestampToKafka;
}
}
......
......@@ -276,7 +276,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
* The incoming data
*/
@Override
public void invoke(IN next) throws Exception {
public void invoke(IN next, Context context) throws Exception {
// propagate asynchronous errors
checkErroneous();
......
......@@ -215,6 +215,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* This test sets KafkaProducer so that it will not automatically flush the data and
* simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
* flushed records manually on snapshotState.
*
* <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
* parameter controls which method is used.
*/
protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
......
......@@ -43,14 +43,9 @@ public class SinkContextUtil {
}
@Override
public long timestamp() {
public Long timestamp() {
return timestamp;
}
@Override
public boolean hasTimestamp() {
return true;
}
};
}
}
......@@ -31,22 +31,22 @@ import java.io.Serializable;
public interface SinkFunction<IN> extends Function, Serializable {
/**
* Function for standard sink behaviour. This function is called for every record.
*
* @param value The input record.
* @throws Exception
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {
}
default void invoke(IN value) throws Exception {}
/**
* Writes the given value to the sink. This function is called for every record.
*
* <p>You have to override this method when implementing a {@code SinkFunction}, this is a
* {@code default} method for backward compatibility with the old-style method only.
*
* @param value The input record.
* @param context Additional context about the input record.
* @throws Exception
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
......@@ -72,15 +72,9 @@ public interface SinkFunction<IN> extends Function, Serializable {
long currentWatermark();
/**
* Returns the timestamp of the current input record.
*/
long timestamp();
/**
* Checks whether this record has a timestamp.
*
* @return True if the record has a timestamp, false if not.
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
boolean hasTimestamp();
Long timestamp();
}
}
......@@ -91,19 +91,11 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
}
@Override
public long timestamp() {
if (!element.hasTimestamp()) {
throw new IllegalStateException(
"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
public Long timestamp() {
if (element.hasTimestamp()) {
return element.getTimestamp();
}
return element.getTimestamp();
}
public boolean hasTimestamp() {
return element.hasTimestamp();
return null;
}
}
}
......@@ -96,7 +96,8 @@ public class StreamSinkOperatorTest extends TestLogger {
@Override
public void invoke(
T value, Context context) throws Exception {
if (context.hasTimestamp()) {
Long timestamp = context.timestamp();
if (timestamp != null) {
data.add(
new Tuple4<>(
context.currentWatermark(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册