提交 97ad55f1 编写于 作者: R Robert Metzger 提交者: Stephan Ewen

[FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API

This closes #1082
上级 9292e5c4
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import com.google.common.base.Preconditions;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.List;
/**
* Flink Sink to produce data into a Kafka topic.
*
* Please note that this producer does not have any reliability guarantees.
*
* @param <IN> Type of the messages to write into Kafka.
*/
public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
private static final long serialVersionUID = 1L;
/**
* Array with the partition ids of the given topicId
* The size of this array is the number of partitions
*/
private final int[] partitions;
/**
* User defined properties for the Producer
*/
private final Properties producerConfig;
/**
* The name of the topic this producer is writing data to
*/
private String topicId;
/**
* (Serializable) SerializationSchema for turning objects used with Flink into
* byte[] for Kafka.
*/
private SerializationSchema<IN, byte[]> schema;
/**
* User-provided partitioner for assigning an object to a Kafka partition.
*/
private KafkaPartitioner partitioner;
// -------------------------------- Runtime fields ------------------------------------------
/**
* KafkaProducer instance.
*/
private transient KafkaProducer<byte[], byte[]> producer;
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
*/
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, null);
}
/**
* The main constructor for creating a FlinkKafkaProducer.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
Preconditions.checkNotNull(topicId, "TopicID not set");
Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
Preconditions.checkNotNull(producerConfig, "producerConfig not set");
ClosureCleaner.ensureSerializable(customPartitioner);
ClosureCleaner.ensureSerializable(serializationSchema);
this.topicId = topicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
// set the producer configuration properties.
if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// create a local KafkaProducer to get the list of partitions.
// this will also ensure locally that all required ProducerConfig values are set.
{
KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
this.partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
getPartitionsProd.close();
}
if(customPartitioner == null) {
this.partitioner = new FixedPartitioner();
} else {
this.partitioner = customPartitioner;
}
}
/**
* Initializes the connection to Kafka.
*/
@Override
public void open(Configuration configuration) {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), partitions);
LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
}
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
* @param next
* The incoming data
*/
@Override
public void invoke(IN next) {
byte[] serialized = schema.serialize(next);
producer.send(new ProducerRecord<byte[], byte[]>(topicId,
partitioner.partition(next, partitions.length),
null,
serialized),
new ErrorLoggingCallback(topicId, null, serialized, false));
}
@Override
public void close() {
if (producer != null) {
producer.close();
}
}
// ----------------------------------- Utilities --------------------------
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
for(String broker: elements) {
NetUtils.getCorrectHostnamePort(broker);
}
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import com.google.common.base.Preconditions;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
/**
* Sink that emits its inputs to a Kafka topic.
*
* @param <IN>
* Type of the sink input
*/
public class KafkaSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
private Producer<IN, byte[]> producer;
private Properties userDefinedProperties;
private String topicId;
private String brokerList;
private SerializationSchema<IN, byte[]> schema;
private SerializableKafkaPartitioner partitioner;
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
/**
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
* @param brokerList
* Addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
*/
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
this(brokerList, topicId, new Properties(), serializationSchema);
}
/**
* Creates a KafkaSink for a given topic with custom Producer configuration.
* If you use this constructor, the broker should be set with the "metadata.broker.list"
* configuration.
*
* @param brokerList
* Addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param producerConfig
* Configurations of the Kafka producer
* @param serializationSchema
* User defined serialization schema.
*/
public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
String[] elements = brokerList.split(",");
for(String broker: elements) {
NetUtils.getCorrectHostnamePort(broker);
}
Preconditions.checkNotNull(topicId, "TopicID not set");
this.brokerList = brokerList;
this.topicId = topicId;
this.schema = serializationSchema;
this.partitionerClass = null;
this.userDefinedProperties = producerConfig;
}
/**
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
* @param brokerList
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
* @param partitioner
* User defined partitioner.
*/
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
this(brokerList, topicId, serializationSchema);
ClosureCleaner.ensureSerializable(partitioner);
this.partitioner = partitioner;
}
public KafkaSink(String brokerList,
String topicId,
SerializationSchema<IN, byte[]> serializationSchema,
Class<? extends SerializableKafkaPartitioner> partitioner) {
this(brokerList, topicId, serializationSchema);
this.partitionerClass = partitioner;
}
/**
* Initializes the connection to Kafka.
*/
@Override
public void open(Configuration configuration) {
Properties properties = new Properties();
properties.put("metadata.broker.list", brokerList);
properties.put("request.required.acks", "-1");
properties.put("message.send.max.retries", "10");
properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
// this will not be used as the key will not be serialized
properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
}
if (partitioner != null) {
properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
// java serialization will do the rest.
properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
}
if (partitionerClass != null) {
properties.put("partitioner.class", partitionerClass);
}
ProducerConfig config = new ProducerConfig(properties);
try {
producer = new Producer<IN, byte[]>(config);
} catch (NullPointerException e) {
throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
}
}
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
* @param next
* The incoming data
*/
@Override
public void invoke(IN next) {
byte[] serialized = schema.serialize(next);
// Sending message without serializable key.
producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
}
@Override
public void close() {
if (producer != null) {
producer.close();
}
}
}
......@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka.api;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
/**
......@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
* This class will be removed in future releases of Flink.
*/
@Deprecated
public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.kafka.KafkaSink<IN> {
public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
super(brokerList, topicId, serializationSchema);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.partitioner;
import java.io.Serializable;
/**
* A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
*
* Note, one Kafka partition can contain multiple Flink partitions.
*
* Cases:
* # More Flink partitions than kafka partitions
* <pre>
* Flink Sinks: Kafka Partitions
* 1 ----------------> 1
* 2 --------------/
* 3 -------------/
* 4 ------------/
* </pre>
* --> Some (or all) kafka partitions contain the output of more than one flink partition
*
*# Fewer Flink partitions than Kafka
* <pre>
* Flink Sinks: Kafka Partitions
* 1 ----------------> 1
* 2 ----------------> 2
* 3
* 4
* 5
* </pre>
*
* --> Not all Kafka partitions contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
* cause a lot of network connections between all the Flink instances and all the Kafka brokers
*
*
*/
public class FixedPartitioner extends KafkaPartitioner implements Serializable {
private static final long serialVersionUID = 1627268846962918126L;
int targetPartition = -1;
@Override
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
int p = 0;
for(int i = 0; i < parallelInstances; i++) {
if(i == parallelInstanceId) {
targetPartition = partitions[p];
return;
}
if(++p == partitions.length) {
p = 0;
}
}
}
@Override
public int partition(Object element, int numPartitions) {
if(targetPartition == -1) {
throw new RuntimeException("The partitioner has not been initialized properly");
}
return targetPartition;
}
}
......@@ -14,12 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.partitioner;
import kafka.producer.Partitioner;
import java.io.Serializable;
public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
/**
* Extended Kafka Partitioner.
* It contains a open() method which is called on each parallel instance.
* Partitioners have to be serializable!
*/
public abstract class KafkaPartitioner implements Partitioner, Serializable {
private static final long serialVersionUID = -1974260817778593473L;
/**
* Initializer for the Partitioner.
* @param parallelInstanceId 0-indexed id of the parallel instance in Flink
* @param parallelInstances the total number of parallel instances
* @param partitions an array describing the partition IDs of the available Kafka partitions.
*/
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
// overwrite this method if needed.
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.consumer.Consumer;
......@@ -67,6 +66,7 @@ import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
import scala.collection.Seq;
......@@ -290,7 +290,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
running = false;
}
});
stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
stream.addSink(new FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
// ----------- add consumer dataflow ----------
......@@ -722,7 +722,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// add producing topology
Properties producerProps = new Properties();
producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
......@@ -760,8 +761,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
});
stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
producerProps, deserSchema));
stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, producerProps));
tryExecute(env, "big topology test");
......@@ -806,6 +806,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
zkClient.close();
final String leaderToShutDown = firstPart.leader().get().connectionString();
final int leaderIdToShutDown = firstPart.leader().get().id();
LOG.info("Leader to shutdown {}", leaderToShutDown);
......@@ -832,10 +833,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
BrokerKillingMapper.killedLeaderBefore = false;
tryExecute(env, "One-to-one exactly once test");
// this cannot be reliably checked, as checkpoints come in time intervals, and
// failures after a number of elements
// assertTrue("Job did not do a checkpoint before the failure",
// BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
// start a new broker:
brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
LOG.info("finished runBrokerFailureTest()");
}
......@@ -920,9 +919,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}).setParallelism(parallelism);
stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
topicName,
new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
stream.addSink(new FlinkKafkaProducer<>(topicName,
new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
new Tuple2Partitioner(parallelism)
)).setParallelism(parallelism);
......
......@@ -26,11 +26,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
import java.io.Serializable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
......@@ -100,8 +103,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
.setParallelism(1);
// sink partitions into
stream.addSink(new KafkaSink<Tuple2<Long, String>>(
brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
.setParallelism(parallelism);
// ------ consuming topology ---------
......@@ -165,7 +167,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
// ------------------------------------------------------------------------
public static class CustomPartitioner implements SerializableKafkaPartitioner {
public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
private final int expectedPartitions;
......
......@@ -96,9 +96,11 @@ public abstract class KafkaTestBase extends TestLogger {
protected static int flinkPort;
protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
protected static List<File> tmpKafkaDirs;
protected static String kafkaHost = "localhost";
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
......@@ -119,14 +121,14 @@ public abstract class KafkaTestBase extends TestLogger {
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
List<File> tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
}
String kafkaHost = "localhost";
int zkPort = NetUtils.getAvailablePort();
zookeeperConnectionString = "localhost:" + zkPort;
......@@ -241,7 +243,7 @@ public abstract class KafkaTestBase extends TestLogger {
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
*/
private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
String kafkaHost,
String zookeeperConnectionString) throws Exception {
Properties kafkaProperties = new Properties();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
public class TestFixedPartitioner {
/**
* <pre>
* Flink Sinks: Kafka Partitions
* 1 ----------------> 1
* 2 --------------/
* 3 -------------/
* 4 ------------/
* </pre>
*/
@Test
public void testMoreFlinkThanBrokers() {
FixedPartitioner part = new FixedPartitioner();
int[] partitions = new int[]{0};
part.open(0, 4, partitions);
Assert.assertEquals(0, part.partition("abc1", partitions.length));
part.open(1, 4, partitions);
Assert.assertEquals(0, part.partition("abc2", partitions.length));
part.open(2, 4, partitions);
Assert.assertEquals(0, part.partition("abc3", partitions.length));
Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;)
part.open(3, 4, partitions);
Assert.assertEquals(0, part.partition("abc4", partitions.length));
}
/**
*
* <pre>
* Flink Sinks: Kafka Partitions
* 1 ----------------> 1
* 2 ----------------> 2
* 3
* 4
* 5
*
* </pre>
*/
@Test
public void testFewerPartitions() {
FixedPartitioner part = new FixedPartitioner();
int[] partitions = new int[]{0, 1, 2, 3, 4};
part.open(0, 2, partitions);
Assert.assertEquals(0, part.partition("abc1", partitions.length));
Assert.assertEquals(0, part.partition("abc1", partitions.length));
part.open(1, 2, partitions);
Assert.assertEquals(1, part.partition("abc1", partitions.length));
Assert.assertEquals(1, part.partition("abc1", partitions.length));
}
/*
* Flink Sinks: Kafka Partitions
* 1 ------------>---> 1
* 2 -----------/----> 2
* 3 ----------/
*/
@Test
public void testMixedCase() {
FixedPartitioner part = new FixedPartitioner();
int[] partitions = new int[]{0,1};
part.open(0, 3, partitions);
Assert.assertEquals(0, part.partition("abc1", partitions.length));
part.open(1, 3, partitions);
Assert.assertEquals(1, part.partition("abc1", partitions.length));
part.open(2, 3, partitions);
Assert.assertEquals(0, part.partition("abc1", partitions.length));
}
}
......@@ -26,8 +26,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
......@@ -69,8 +69,9 @@ public class DataGenerators {
}
});
stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
stream.addSink(new FlinkKafkaProducer<>(topic,
new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
new Tuple2Partitioner(numPartitions)
));
......@@ -131,9 +132,10 @@ public class DataGenerators {
stream
.rebalance()
.addSink(new KafkaSink<>(brokerConnection, topic,
.addSink(new FlinkKafkaProducer<>(topic,
new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
new SerializableKafkaPartitioner() {
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
new KafkaPartitioner() {
@Override
public int partition(Object key, int numPartitions) {
return ((Integer) key) % numPartitions;
......@@ -164,9 +166,10 @@ public class DataGenerators {
@Override
public void run() {
// we manually feed data into the Kafka sink
KafkaSink<String> producer = null;
FlinkKafkaProducer<String> producer = null;
try {
producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
producer.setRuntimeContext(new MockRuntimeContext(1,0));
producer.open(new Configuration());
final StringBuilder bld = new StringBuilder();
......
......@@ -19,13 +19,15 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import java.io.Serializable;
/**
* Special partitioner that uses the first field of a 2-tuple as the partition,
* and that expects a specific number of partitions.
*/
public class Tuple2Partitioner implements SerializableKafkaPartitioner {
public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -36,5 +36,5 @@ public interface SerializationSchema<T, R> extends Serializable {
* The incoming element to be serialized
* @return The serialized element.
*/
public R serialize(T element);
R serialize(T element);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册