Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
9f68e790
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
9f68e790
编写于
12月 18, 2017
作者:
T
Tzu-Li (Gordon) Tai
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour
上级
b49ead38
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
322 addition
and
55 deletion
+322
-55
docs/dev/connectors/kafka.md
docs/dev/connectors/kafka.md
+32
-14
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
...ink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+73
-10
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
...ink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+76
-11
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
...link/streaming/connectors/kafka/FlinkKafkaProducer08.java
+70
-8
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
...link/streaming/connectors/kafka/FlinkKafkaProducer09.java
+71
-12
未找到文件。
docs/dev/connectors/kafka.md
浏览文件 @
9f68e790
...
...
@@ -462,11 +462,8 @@ FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false); // "false" by default
myProducer.setFlushOnCheckpoint(true); // "false" by default
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);
...
...
@@ -481,11 +478,8 @@ val myProducer = new FlinkKafkaProducer011[String](
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false) // "false" by default
myProducer.setFlushOnCheckpoint(true) // "false" by default
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)
stream.addSink(myProducer)
...
...
@@ -505,11 +499,30 @@ are other constructor variants that allow providing the following:
partitions, you can provide an implementation of a
`FlinkKafkaPartitioner`
to the
constructor. This partitioner will be called for each record in the stream
to determine which exact partition of the target topic the record should be sent to.
Please see
[
Kafka Producer Partitioning Scheme
](
#kafka-producer-partitioning-scheme
)
for more details.
*
*Advanced serialization schema*
: Similar to the consumer,
the producer also allows using an advanced serialization schema called
`KeyedSerializationSchema`
,
which allows serializing the key and value separately. It also allows to override the target topic,
so that one producer instance can send data to multiple topics.
### Kafka Producer Partitioning Scheme
By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
a
`FlinkFixedPartitioner`
that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
(i.e., all records received by a sink subtask will end up in the same Kafka partition).
A custom partitioner can be implemented by extending the
`FlinkKafkaPartitioner`
class. All
Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
is not part of the producer's checkpointed state.
It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
the written records by their attached key (as determined for each record using the provided serialization schema).
To do this, provide a
`null`
custom partitioner when instantiating the producer. It is important
to provide
`null`
as the custom partitioner; as explained above, if a custom partitioner is not specified
the
`FlinkFixedPartitioner`
is used instead.
### Kafka Producers and Fault Tolerance
#### Kafka 0.8
...
...
@@ -522,17 +535,22 @@ With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaPr
can provide at-least-once delivery guarantees.
Besides enabling Flink's checkpointing, you should also configure the setter
methods
`setLogFailuresOnly(boolean)`
and
`setFlushOnCheckpoint(boolean)`
appropriately,
as shown in the above examples in the previous section:
methods
`setLogFailuresOnly(boolean)`
and
`setFlushOnCheckpoint(boolean)`
appropriately.
*
`setLogFailuresOnly(boolean)`
: enabling this will let the producer log failures only
*
`setLogFailuresOnly(boolean)`
: by default, this is set to
`false`
.
Enabling this will let the producer only log failures
instead of catching and rethrowing them. This essentially accounts the record
to have succeeded, even if it was never written to the target Kafka topic. This
must be disabled for at-least-once.
*
`setFlushOnCheckpoint(boolean)`
: with this enabled, Flink's checkpoints will wait for any
*
`setFlushOnCheckpoint(boolean)`
: by default, this is set to
`false`
.
With this enabled, Flink's checkpoints will wait for any
on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
succeeding the checkpoint. This ensures that all records before the checkpoint have
been written to Kafka. This must be enabled for at-least-once.
In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
0.
9 and 0.10,
`setLogFailureOnly`
must be set to
`false`
and
`setFlushOnCheckpoint`
must be set
to
`true`
.
**Note**
: By default, the number of retries is set to "0". This means that when
`setLogFailuresOnly`
is set to
`false`
,
the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
...
...
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
浏览文件 @
9f68e790
...
...
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
javax.annotation.Nullable
;
import
java.util.Properties
;
/**
...
...
@@ -124,12 +126,20 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
*/
public
FlinkKafkaProducer010
(
String
brokerList
,
String
topicId
,
SerializationSchema
<
T
>
serializationSchema
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
getPropertiesFromBrokerList
(
brokerList
),
new
FlinkFixedPartitioner
<
T
>());
...
...
@@ -139,10 +149,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
...
...
@@ -151,15 +169,26 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A
(keyless)
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param serializationSchema A
key-less
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be distributed to Kafka partitions
* in a round-robin fashion.
*/
public
FlinkKafkaProducer010
(
String
topicId
,
SerializationSchema
<
T
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
T
>
customPartitioner
)
{
public
FlinkKafkaProducer010
(
String
topicId
,
SerializationSchema
<
T
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
T
>
customPartitioner
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
producerConfig
,
customPartitioner
);
}
...
...
@@ -169,6 +198,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
...
...
@@ -184,6 +221,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
...
...
@@ -196,11 +241,29 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
}
/**
* Create Kafka producer.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
*/
public
FlinkKafkaProducer010
(
String
topicId
,
KeyedSerializationSchema
<
T
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
T
>
customPartitioner
)
{
public
FlinkKafkaProducer010
(
String
topicId
,
KeyedSerializationSchema
<
T
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
T
>
customPartitioner
)
{
super
(
topicId
,
serializationSchema
,
producerConfig
,
customPartitioner
);
}
...
...
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
浏览文件 @
9f68e790
...
...
@@ -277,13 +277,21 @@ public class FlinkKafkaProducer011<IN>
}
/**
* Creates a FlinkKafkaProducer for a given topic.
t
he sink produces a DataStream to
* Creates a FlinkKafkaProducer for a given topic.
T
he sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer011(String, SerializationSchema, Properties, Optional)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
...
...
@@ -296,15 +304,26 @@ public class FlinkKafkaProducer011<IN>
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A
(keyless)
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param serializationSchema A
key-less
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be distributed to Kafka partitions
* in a round-robin fashion.
*/
public
FlinkKafkaProducer011
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
Optional
<
FlinkKafkaPartitioner
<
IN
>>
customPartitioner
)
{
public
FlinkKafkaProducer011
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
Optional
<
FlinkKafkaPartitioner
<
IN
>>
customPartitioner
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
producerConfig
,
customPartitioner
);
}
...
...
@@ -314,6 +333,14 @@ public class FlinkKafkaProducer011<IN>
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
...
...
@@ -333,6 +360,14 @@ public class FlinkKafkaProducer011<IN>
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
...
...
@@ -352,12 +387,22 @@ public class FlinkKafkaProducer011<IN>
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional, Semantic, int)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema supporting key/value messages
* @param producerConfig
* Properties with the producer configuration.
* @param semantic
* Defines semantic that will be used by this producer (see {@link Semantic}).
*/
public
FlinkKafkaProducer011
(
String
topicId
,
...
...
@@ -374,12 +419,22 @@ public class FlinkKafkaProducer011<IN>
/**
* The main constructor for creating a FlinkKafkaProducer.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param defaultTopicId The default topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
*/
public
FlinkKafkaProducer011
(
String
defaultTopicId
,
...
...
@@ -396,12 +451,22 @@ public class FlinkKafkaProducer011<IN>
}
/**
* The main constructor for creating a FlinkKafkaProducer.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param defaultTopicId The default topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
* @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
*/
...
...
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
浏览文件 @
9f68e790
...
...
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
;
import
javax.annotation.Nullable
;
import
java.util.Properties
;
/**
...
...
@@ -38,17 +40,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
private
static
final
long
serialVersionUID
=
1L
;
// ------------------- Keyless serialization schema constructors ----------------------
// ------------------- Key-less serialization schema constructors ----------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
*/
public
FlinkKafkaProducer08
(
String
brokerList
,
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
getPropertiesFromBrokerList
(
brokerList
),
new
FlinkFixedPartitioner
<
IN
>());
...
...
@@ -58,10 +69,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
...
...
@@ -70,14 +89,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
}
/**
* The main constructor for creating a FlinkKafkaProducer.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A
(keyless)
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param serializationSchema A
key-less
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be distributed to Kafka partitions
* in a round-robin fashion.
*/
public
FlinkKafkaProducer08
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
public
FlinkKafkaProducer08
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
producerConfig
,
customPartitioner
);
}
...
...
@@ -88,6 +119,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
...
...
@@ -103,6 +142,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
...
...
@@ -115,14 +162,29 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
}
/**
* The main constructor for creating a FlinkKafkaProducer.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
*/
public
FlinkKafkaProducer08
(
String
topicId
,
KeyedSerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
public
FlinkKafkaProducer08
(
String
topicId
,
KeyedSerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
super
(
topicId
,
serializationSchema
,
producerConfig
,
customPartitioner
);
}
...
...
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
浏览文件 @
9f68e790
...
...
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
;
import
javax.annotation.Nullable
;
import
java.util.Properties
;
/**
...
...
@@ -38,31 +40,47 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
private
static
final
long
serialVersionUID
=
1L
;
// ------------------- Keyless serialization schema constructors ----------------------
// ------------------- Key
-
less serialization schema constructors ----------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
*/
public
FlinkKafkaProducer09
(
String
brokerList
,
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
getPropertiesFromBrokerList
(
brokerList
),
new
FlinkFixedPartitioner
<
IN
>());
}
/**
* Creates a FlinkKafkaProducer for a given topic.
t
he sink produces a DataStream to
* Creates a FlinkKafkaProducer for a given topic.
T
he sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined
(keyless)
serialization schema.
* User defined
key-less
serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
...
...
@@ -71,15 +89,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A
(keyless)
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param serializationSchema A
key-less
serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be distributed to Kafka partitions
* in a round-robin fashion.
*/
public
FlinkKafkaProducer09
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
public
FlinkKafkaProducer09
(
String
topicId
,
SerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
this
(
topicId
,
new
KeyedSerializationSchemaWrapper
<>(
serializationSchema
),
producerConfig
,
customPartitioner
);
}
...
...
@@ -90,6 +119,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
...
...
@@ -105,6 +142,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
...
...
@@ -117,15 +162,29 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
*/
public
FlinkKafkaProducer09
(
String
topicId
,
KeyedSerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
public
FlinkKafkaProducer09
(
String
topicId
,
KeyedSerializationSchema
<
IN
>
serializationSchema
,
Properties
producerConfig
,
@Nullable
FlinkKafkaPartitioner
<
IN
>
customPartitioner
)
{
super
(
topicId
,
serializationSchema
,
producerConfig
,
customPartitioner
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录