提交 b9892a0e 编写于 作者: S Stephan Ewen

[FLINK-2386] [kafka connector] Refactor, cleanup, and fix kafka consumers

上级 33f4c818
......@@ -96,6 +96,14 @@ under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* This package (and its sub-packages) contain only classes that arecopied from
* the Apache Kafka project.
......@@ -7,4 +24,4 @@
* This is a temporary workaround!
package org.apache.flink.kafka_backport;
\ No newline at end of file
package org.apache.flink.kafka_backport;
......@@ -14,18 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumerBase<T> {
* Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
* The consumer will internally use the old low-level Kafka API, and manually commit offsets
* partition offsets to ZooKeeper.
* <p>The following additional configuration values are available:</p>
* <ul>
* <li>socket.timeout.ms</li>
* <li>socket.receive.buffer.bytes</li>
* <li>fetch.message.max.bytes</li>
* <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
* <li>fetch.wait.max.ms</li>
* </ul>
* @param <T> The type of elements produced by this consumer.
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
private static final long serialVersionUID = -5649906773771949146L;
* Creates a new Kafka 0.8.1.x streaming source consumer.
* @param topic
* The name of the topic that should be consumed.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props);
this.offsetStore = OffsetStore.FLINK_ZOOKEEPER;
this.fetcherType = FetcherType.LEGACY;
super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
......@@ -14,25 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
* Kafka consumer for Kafka 0.8.2.x
* It commits the offsets to Zookeeper (Flink code) and uses the consumer of Kafka 0.8.3 (currently WIP).
* Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
* The consumer will use the new Kafka consumer API (early Flink backport version),
* and manually commit offsets partition offsets to ZooKeeper.
* @param <T>
* @param <T> The type of elements produced by this consumer.
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumerBase<T> {
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
private static final long serialVersionUID = -8450689820627198228L;
* Creates a new Kafka 0.8.2.x streaming source consumer.
* @param topic
* The name of the topic that should be consumed.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props);
this.offsetStore = OffsetStore.FLINK_ZOOKEEPER;
this.fetcherType = FetcherType.INCLUDED;
super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.NEW_HIGH_LEVEL);
......@@ -14,18 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
public class FlinkKafkaConsumer083<T> extends FlinkKafkaConsumerBase<T> {
* Creates a Kafka consumer compatible with reading from Kafka 0.8.3.x brokers.
* The consumer will use the new Kafka consumer API (early Flink backport version),
* and lets Kafka handle the offset committing internally.
* @param <T> The type of elements produced by this consumer.
public class FlinkKafkaConsumer083<T> extends FlinkKafkaConsumer<T> {
private static final long serialVersionUID = 1126432820518992927L;
* Creates a new Kafka 0.8.3.x streaming source consumer.
* @param topic
* The name of the topic that should be consumed.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
public FlinkKafkaConsumer083(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props);
this.offsetStore = OffsetStore.KAFKA;
this.fetcherType = FetcherType.INCLUDED;
super(topic, valueDeserializer, props, OffsetStore.KAFKA, FetcherType.NEW_HIGH_LEVEL);
......@@ -14,51 +14,70 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.kafka_backport.common.TopicPartition;
import java.io.IOException;
import java.util.List;
import java.util.Map;
* A fetcher pulls data from Kafka, from a fix set of partitions.
* The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
public interface Fetcher {
* Set which partitions we want to read from
* @param partitions
void partitionsToRead(List<TopicPartition> partitions);
* Ask the run() method to stop reading
* Set which partitions the fetcher should pull from.
* @param partitions The list of partitions for a topic that the fetcher will pull from.
void stop();
void setPartitionsToRead(List<TopicPartition> partitions);
* Close the underlying connection
* Closes the fetcher. This will stop any operation in the
* {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
* close underlying connections and release all resources.
void close();
void close() throws IOException;
* Start and fetch indefinitely from the underlying fetcher
* @param sourceContext
* @param valueDeserializer
* @param lastOffsets
* @param <T>
* Starts fetch data from Kafka and emitting it into the stream.
* <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
* of the last consumed offset in one atomic operation:</p>
* <pre>{@code
* while (running) {
* T next = ...
* long offset = ...
* int partition = ...
* synchronized (sourceContext.getCheckpointLock()) {
* sourceContext.collect(next);
* lastOffsets[partition] = offset;
* }
* }
* }</pre>
* @param sourceContext The source context to emit elements to.
* @param valueDeserializer The deserializer to decode the raw values with.
* @param lastOffsets The array into which to store the offsets foe which elements are emitted.
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets);
* Commit offset (if supported)
* @param offsetsToCommit
void commit(Map<TopicPartition, Long> offsetsToCommit);
<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer,
long[] lastOffsets) throws Exception;
* Set offsets for the partitions.
* The offset is the next offset to read. So if set to 0, the Fetcher's first result will be the msg with offset=0.
* Set the next offset to read from for the given partition.
* For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
* will be the message with <i>offset=n</i>.
* @param topicPartition The partition for which to seek the offset.
* @param offsetToRead To offset to seek to.
void seek(TopicPartition topicPartition, long offsetToRead);
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka_backport.common.PartitionInfo;
import org.apache.flink.kafka_backport.common.TopicPartition;
import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
* When using the legacy fetcher, the following additional configuration values are available:
* - socket.timeout.ms
* - socket.receive.buffer.bytes
* - fetch.message.max.bytes
* - auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)
* - flink.kafka.consumer.queue.size (Size of the queue between the fetching threads)
* - fetch.wait.max.ms
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable {
public static Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
static final long OFFSET_NOT_SET = -1L;
private final String topic;
private final Properties props;
private final int[] partitions; // ordered, immutable partition assignment list
private final DeserializationSchema<T> valueDeserializer;
private transient Fetcher fetcher;
private final LinkedMap pendingCheckpoints = new LinkedMap();
private long[] lastOffsets;
protected long[] commitedOffsets;
private ZkClient zkClient;
private long[] restoreToOffset;
protected OffsetStore offsetStore = OffsetStore.FLINK_ZOOKEEPER;
protected FetcherType fetcherType = FetcherType.LEGACY;
private boolean isNoOp = false; // no-operation instance (for example when there are fewer partitions that flink consumers)
private boolean closed = false;
public enum OffsetStore {
Let Flink manage the offsets. It will store them in Zookeeper, in the same structure as Kafka 0.8.2.x
Use this mode when using the source with Kafka 0.8.x brokers
Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different mechanism are used (broker coordinator, zookeeper)
public enum FetcherType {
LEGACY, /* Use this fetcher for Kafka 0.8.1 brokers */
INCLUDED /* This fetcher works with Kafka 0.8.2 and 0.8.3 */
public FlinkKafkaConsumerBase(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this.topic = topic;
this.props = props; // TODO check for zookeeper properties
this.valueDeserializer = valueDeserializer;
// Connect to a broker to get the partitions
List<PartitionInfo> partitionInfos = getPartitionsForTopic(this.topic, this.props);
// get initial partitions list. The order of the partitions is important for consistent partition id assignment
// in restart cases.
// Note that the source will fail (= job will restart) in case of a broker failure
partitions = new int[partitionInfos.size()];
for(int i = 0; i < partitionInfos.size(); i++) {
partitions[i] = partitionInfos.get(i).partition();
LOG.info("Topic {} has {} partitions", topic, partitions.length);
// ----------------------------- Source ------------------------------
public void open(Configuration parameters) throws Exception {
// make sure that we take care of the committing
props.setProperty("enable.auto.commit", "false");
// create fetcher
fetcher = new IncludedFetcher(props);
case LEGACY:
fetcher = new LegacyFetcher(topic, props);
throw new RuntimeException("Requested unknown fetcher "+fetcher);
// tell which partitions we want to subscribe
List<TopicPartition> partitionsToSub = assignPartitions(this.partitions);
LOG.info("This instance (id={}) is going to subscribe to partitions {}", getRuntimeContext().getIndexOfThisSubtask(), partitionsToSub);
if(partitionsToSub.size() == 0) {
LOG.info("This instance is a no-op instance.");
isNoOp = true;
// set up operator state
lastOffsets = new long[partitions.length];
Arrays.fill(lastOffsets, OFFSET_NOT_SET);
// prepare Zookeeper
if(offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
String zkConnect = props.getProperty("zookeeper.connect");
if(zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
zkClient = new ZkClient(zkConnect,
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
new KafkaZKStringSerializer());
commitedOffsets = new long[partitions.length];
// seek to last known pos, from restore request
if(restoreToOffset != null) {
LOG.info("Found offsets to restore to: "+Arrays.toString(restoreToOffset));
for(int i = 0; i < restoreToOffset.length; i++) {
if(restoreToOffset[i] != OFFSET_NOT_SET) {
// if this fails because we are not subscribed to the topic, the partition assignment is not deterministic!
// we set the offset +1 here, because seek() is accepting the next offset to read, but the restore offset is the last read offset
fetcher.seek(new TopicPartition(topic, i), restoreToOffset[i] + 1);
} else {
// no restore request. See what we have in ZK for this consumer group. In the non ZK case, Kafka will take care of this.
if(offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
for (TopicPartition tp : partitionsToSub) {
long offset = getOffset(zkClient, props.getProperty(ConsumerConfig.GROUP_ID_CONFIG), topic, tp.partition());
if (offset != OFFSET_NOT_SET) {
LOG.info("Offset for partition {} was set to {} in ZK. Seeking consumer to that position", tp.partition(), offset);
// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
fetcher.seek(tp, offset + 1);
public void run(SourceContext<T> sourceContext) throws Exception {
if(isNoOp) {
fetcher.run(sourceContext, valueDeserializer, lastOffsets);
public void cancel() {
if(isNoOp) {
public void close() throws Exception {
closed = true;
public TypeInformation getProducedType() {
return valueDeserializer.getProducedType();
public List<TopicPartition> assignPartitions(int[] parts) {
LOG.info("Assigning partitions from "+Arrays.toString(parts));
List<TopicPartition> partitionsToSub = new ArrayList<TopicPartition>();
int machine = 0;
for(int i = 0; i < parts.length; i++) {
if(machine == getRuntimeContext().getIndexOfThisSubtask()) {
partitionsToSub.add(new TopicPartition(topic, parts[i]));
if(machine == getRuntimeContext().getNumberOfParallelSubtasks()) {
machine = 0;
return partitionsToSub;
// ----------------------------- Utilities -------------------------
* Send request to Kafka cluster to get partitions for topic.
protected static List<PartitionInfo> getPartitionsForTopic(String topic, Properties properties) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties, null,
new ByteArrayDeserializer(), new ByteArrayDeserializer());
try {
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
if (partitions == null) {
throw new RuntimeException("The topic " + topic + " does not seem to exist");
if(partitions.size() == 0) {
throw new RuntimeException("The topic "+topic+" does not seem to have any partitions");
return partitions;
} finally {
// ----------------------------- State ------------------------------
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if(fetcher == null) {
LOG.info("notifyCheckpointComplete() called on uninitialized source");
if(closed) {
LOG.info("notifyCheckpointComplete() called on closed source");
LOG.info("Commit checkpoint {}", checkpointId);
long[] checkpointOffsets;
// the map may be asynchronously updates when snapshotting state, so we synchronize
synchronized (pendingCheckpoints) {
final int posInMap = pendingCheckpoints.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
// remove older checkpoints in map:
if (!pendingCheckpoints.isEmpty()) {
for(int i = 0; i < posInMap; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
if(offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
} else {
Map<TopicPartition, Long> offsetsToCommit = new HashMap<TopicPartition, Long>();
for(int i = 0; i < checkpointOffsets.length; i++) {
if(checkpointOffsets[i] != OFFSET_NOT_SET) {
offsetsToCommit.put(new TopicPartition(topic, i), checkpointOffsets[i]);
public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
if (lastOffsets == null) {
LOG.warn("State snapshot requested on not yet opened source. Returning null");
return null;
if(closed) {
LOG.info("snapshotState() called on closed source");
return null;
if (LOG.isInfoEnabled()) {
LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}",
Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
// the map may be asynchronously updated when committing to Kafka, so we synchronize
synchronized (pendingCheckpoints) {
pendingCheckpoints.put(checkpointId, currentOffsets);
return currentOffsets;
public void restoreState(long[] restoredOffsets) {
restoreToOffset = restoredOffsets;
// ---------- Zookeeper communication ----------------
private void setOffsetsInZooKeeper(long[] offsets) {
for (int partition = 0; partition < offsets.length; partition++) {
long offset = offsets[partition];
if(offset != OFFSET_NOT_SET) {
setOffset(partition, offset);
protected void setOffset(int partition, long offset) {
// synchronize because notifyCheckpointComplete is called using asynchronous worker threads (= multiple checkpoints might be confirmed concurrently)
synchronized (commitedOffsets) {
if(closed) {
// it might happen that the source has been closed while the asynchronous commit threads waited for committing the offsets.
LOG.warn("setOffset called on closed source");
if(commitedOffsets[partition] < offset) {
setOffset(zkClient, props.getProperty(ConsumerConfig.GROUP_ID_CONFIG), topic, partition, offset);
commitedOffsets[partition] = offset;
} else {
LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
// the following two methods are static to allow access from the outside as well (Testcases)
* This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
if(data._1().isEmpty()) {
} else {
return Long.valueOf(data._1().get());
// ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there) -----------------
public static class KafkaZKStringSerializer implements ZkSerializer {
public byte[] serialize(Object data) throws ZkMarshallingError {
try {
return ((String) data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
if (bytes == null) {
return null;
} else {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
......@@ -24,75 +25,87 @@ import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka_backport.common.TopicPartition;
import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class IncludedFetcher implements Fetcher {
public static Logger LOG = LoggerFactory.getLogger(IncludedFetcher.class);
* A fetcher that uses the new Kafka consumer API to fetch data for a specifies set of partitions.
public class NewConsumerApiFetcher implements Fetcher, OffsetHandler {
public final static String POLL_TIMEOUT = "flink.kafka.consumer.poll.timeout";
public final static long DEFAULT_POLL_TIMEOUT = 50;
private static final String POLL_TIMEOUT_PROPERTY = "flink.kafka.consumer.poll.timeout";
private static final long DEFAULT_POLL_TIMEOUT = 50;
private static final ByteArrayDeserializer NO_OP_SERIALIZER = new ByteArrayDeserializer();
final KafkaConsumer<byte[], byte[]> fetcher;
final Properties props;
boolean running = true;
private final KafkaConsumer<byte[], byte[]> fetcher;
private final long pollTimeout;
private volatile boolean running = true;
public IncludedFetcher(Properties props) {
this.props = props;
fetcher = new KafkaConsumer<byte[], byte[]>(props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
public NewConsumerApiFetcher(Properties props) {
this.pollTimeout = props.contains(POLL_TIMEOUT_PROPERTY) ?
Long.valueOf(props.getProperty(POLL_TIMEOUT_PROPERTY)) :
this.fetcher = new KafkaConsumer<byte[], byte[]>(props, null, NO_OP_SERIALIZER, NO_OP_SERIALIZER);
public void partitionsToRead(List<TopicPartition> partitions) {
fetcher.subscribe(partitions.toArray(new TopicPartition[partitions.size()]));
public void setPartitionsToRead(List<TopicPartition> partitions) {
synchronized (fetcher) {
if (fetcher.subscriptions().isEmpty()) {
fetcher.subscribe(partitions.toArray(new TopicPartition[partitions.size()]));
else {
throw new IllegalStateException("Fetcher has already subscribed to its set of partitions");
public void seek(TopicPartition topicPartition, long offsetToRead) {
fetcher.seek(topicPartition, offsetToRead);
synchronized (fetcher) {
fetcher.seek(topicPartition, offsetToRead);
public void close() {
running = false;
synchronized (fetcher) {
public <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
long pollTimeout = DEFAULT_POLL_TIMEOUT;
if(props.contains(POLL_TIMEOUT)) {
pollTimeout = Long.valueOf(props.getProperty(POLL_TIMEOUT));
while(running) {
public <T> void run(SourceFunction.SourceContext<T> sourceContext,
DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
while (running) {
// poll is always returning a new object.
ConsumerRecords<byte[], byte[]> consumed;
synchronized (fetcher) {
consumed = fetcher.poll(pollTimeout);
if(!consumed.isEmpty()) {
for(ConsumerRecord<byte[], byte[]> record : consumed) {
// synchronize inside the loop to allow checkpoints in between
synchronized (sourceContext.getCheckpointLock()) {
T value = valueDeserializer.deserialize(record.value());
lastOffsets[record.partition()] = record.offset();
final Iterator<ConsumerRecord<byte[], byte[]>> records = consumed.iterator();
while (running && records.hasNext()) {
ConsumerRecord<byte[], byte[]> record = records.next();
T value = valueDeserializer.deserialize(record.value());
// synchronize inside the loop to allow checkpoints in between batches
synchronized (sourceContext.getCheckpointLock()) {
lastOffsets[record.partition()] = record.offset();
public void stop() {
running = false;
......@@ -102,4 +115,9 @@ public class IncludedFetcher implements Fetcher {
public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
// no need to do anything here.
// if Kafka manages the offsets, it has them automatically
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import org.apache.flink.kafka_backport.common.TopicPartition;
import java.io.IOException;
import java.util.List;
import java.util.Map;
* The offset handler is responsible for locating the initial partition offsets
* where the source should start reading, as well as committing offsets from completed
* checkpoints.
public interface OffsetHandler {
* Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
* or to ZooKeeper, based on its configured behavior.
* @param offsetsToCommit The offset to commit, per partition.
void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
* Positions the given fetcher to the initial read offsets where the stream consumption
* will start from.
* @param partitions The partitions for which to seeks the fetcher to the beginning.
* @param fetcher The fetcher that will pull data from Kafka and must be positioned.
void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
* Closes the offset handler, releasing all resources.
* @throws IOException Thrown, if the closing fails.
void close() throws IOException;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.nio.charset.Charset;
* Simple ZooKeeper serializer for Strings.
public class ZooKeeperStringSerializer implements ZkSerializer {
private static final Charset CHARSET = Charset.forName("UTF-8");
public byte[] serialize(Object data) {
if (data instanceof String) {
return ((String) data).getBytes(CHARSET);
else {
throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
public Object deserialize(byte[] bytes) {
if (bytes == null) {
return null;
else {
return new String(bytes, CHARSET);
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka_backport.common.TopicPartition;
import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ZookeeperOffsetHandler implements OffsetHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
private final ZkClient zkClient;
private final String groupId;
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
zkClient = new ZkClient(zkConnect,
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
new ZooKeeperStringSerializer());
public void commit(Map<TopicPartition, Long> offsetsToCommit) {
for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
TopicPartition tp = entry.getKey();
long offset = entry.getValue();
if (offset >= 0) {
setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
for (TopicPartition tp : partitions) {
long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
if (offset != OFFSET_NOT_SET) {
LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
tp.partition(), offset);
// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
fetcher.seek(tp, offset + 1);
public void close() throws IOException {
// ------------------------------------------------------------------------
// Communication with Zookeeper
// ------------------------------------------------------------------------
public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
topicDirs.consumerOffsetDir() + "/" + tap.partition());
if (data._1().isEmpty()) {
} else {
return Long.valueOf(data._1().get());
......@@ -14,56 +14,87 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Arrays;
import org.junit.Test;
import java.util.Properties;
public class Kafka081ITCase extends KafkaTestBase {
public class Kafka081ITCase extends KafkaConsumerTestBase {
<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
return new TestFlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
// ------------------------------------------------------------------------
// Suite of Tests
// ------------------------------------------------------------------------
public void testCheckpointing() {
long[] getFinalOffsets() {
return TestFlinkKafkaConsumer081.finalOffset;
public void testOffsetInZookeeper() {
public void testConcurrentProducerConsumerTopology() {
void resetOffsets() {
TestFlinkKafkaConsumer081.finalOffset = null;
// --- canceling / failures ---
public void testCancelingEmptyTopic() {
public void testCancelingFullTopic() {
public void testFailOnDeploy() {
public static class TestFlinkKafkaConsumer081<OUT> extends FlinkKafkaConsumer081<OUT> {
public static long[] finalOffset;
public TestFlinkKafkaConsumer081(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
super(topicName, deserializationSchema, consumerConfig);
public void close() throws Exception {
synchronized (commitedOffsets) {
LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
if (finalOffset == null) {
finalOffset = new long[commitedOffsets.length];
for(int i = 0; i < commitedOffsets.length; i++) {
if(commitedOffsets[i] > 0) {
if(finalOffset[i] > 0) {
throw new RuntimeException("This is unexpected on i = "+i);
finalOffset[i] = commitedOffsets[i];
// --- source to partition mappings and exactly once ---
public void testOneToOneSources() {
public void testOneSourceMultiplePartitions() {
public void testMultipleSourcesOnePartition() {
// --- broker failure ---
public void testBrokerFailure() {
// --- special executions ---
public void testBigRecordJob() {
......@@ -14,57 +14,89 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Arrays;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Properties;
public class Kafka082ITCase extends KafkaTestBase {
public class Kafka082ITCase extends KafkaConsumerTestBase {
<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
return new TestFlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
return new FlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
long[] getFinalOffsets() {
return TestFlinkKafkaConsumer082.finalOffset;
// ------------------------------------------------------------------------
// Suite of Tests
// ------------------------------------------------------------------------
public void testCheckpointing() {
void resetOffsets() {
TestFlinkKafkaConsumer082.finalOffset = null;
public void testOffsetInZookeeper() {
public void testConcurrentProducerConsumerTopology() {
// --- canceling / failures ---
public void testCancelingEmptyTopic() {
public void testCancelingFullTopic() {
public static class TestFlinkKafkaConsumer082<OUT> extends FlinkKafkaConsumer082<OUT> {
private final static Object sync = new Object();
public static long[] finalOffset;
public TestFlinkKafkaConsumer082(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
super(topicName, deserializationSchema, consumerConfig);
public void close() throws Exception {
synchronized (commitedOffsets) {
LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
if (finalOffset == null) {
finalOffset = new long[commitedOffsets.length];
for(int i = 0; i < commitedOffsets.length; i++) {
if(commitedOffsets[i] > 0) {
if(finalOffset[i] > 0) {
throw new RuntimeException("This is unexpected on i = "+i);
finalOffset[i] = commitedOffsets[i];
public void testFailOnDeploy() {
// --- source to partition mappings and exactly once ---
public void testOneToOneSources() {
public void testOneSourceMultiplePartitions() {
public void testMultipleSourcesOnePartition() {
// --- broker failure ---
public void testBrokerFailure() {
// --- special executions ---
@Ignore("this does not work with the new consumer")
public void testBigRecordJob() {
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.Ignore;
import java.util.Arrays;
import java.util.Properties;
public class Kafka083ITCase extends KafkaTestBase {
<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
return new TestFlinkKafkaConsumer083<T>(topic, deserializationSchema, props);
long[] getFinalOffsets() {
return TestFlinkKafkaConsumer083.finalOffset;
void resetOffsets() {
TestFlinkKafkaConsumer083.finalOffset = null;
public static class TestFlinkKafkaConsumer083<OUT> extends FlinkKafkaConsumer083<OUT> {
public static long[] finalOffset;
public TestFlinkKafkaConsumer083(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
super(topicName, deserializationSchema, consumerConfig);
public void close() throws Exception {
synchronized (commitedOffsets) {
LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
if (finalOffset == null) {
finalOffset = new long[commitedOffsets.length];
for(int i = 0; i < commitedOffsets.length; i++) {
if(commitedOffsets[i] > 0) {
if(finalOffset[i] > 0) {
throw new RuntimeException("This is unexpected on i = "+i);
finalOffset[i] = commitedOffsets[i];
public void brokerFailureTest() throws Exception {
// Skipping test: The test is committing the offsets to the Kafka Broker.
// only 0.8.3 brokers support that.
public void testFlinkKafkaConsumerWithOffsetUpdates() throws Exception {
// Skipping test (see above)
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.kafka_backport.common.TopicPartition;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.*;
* Tests that the partition assignment is deterministic and stable.
public class KafkaConsumerPartitionAssignmentTest {
public void testPartitionsEqualConsumers() {
try {
int[] partitions = {4, 52, 17, 1};
for (int i = 0; i < partitions.length; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", partitions.length, i);
assertEquals(1, parts.size());
assertTrue(contains(partitions, parts.get(0).partition()));
catch (Exception e) {
public void testMultiplePartitionsPerConsumers() {
try {
final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
final Set<Integer> allPartitions = new HashSet<>();
for (int i : partitions) {
final int numConsumers = 3;
final int minPartitionsPerConsumer = partitions.length / numConsumers;
final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
for (int i = 0; i < numConsumers; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", numConsumers, i);
assertTrue(parts.size() >= minPartitionsPerConsumer);
assertTrue(parts.size() <= maxPartitionsPerConsumer);
for (TopicPartition p : parts) {
// check that the element was actually contained
// all partitions must have been assigned
catch (Exception e) {
public void testPartitionsFewerThanConsumers() {
try {
final int[] partitions = {4, 52, 17, 1};
final Set<Integer> allPartitions = new HashSet<>();
for (int i : partitions) {
final int numConsumers = 2 * partitions.length + 3;
for (int i = 0; i < numConsumers; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", numConsumers, i);
assertTrue(parts.size() <= 1);
for (TopicPartition p : parts) {
// check that the element was actually contained
// all partitions must have been assigned
catch (Exception e) {
public void testAssignEmptyPartitions() {
try {
List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
catch (Exception e) {
public void testGrowingPartitionsRemainsStable() {
try {
final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
final Set<Integer> allNewPartitions = new HashSet<>();
final Set<Integer> allInitialPartitions = new HashSet<>();
for (int i : newPartitions) {
for (int i : initialPartitions) {
final int numConsumers = 3;
final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 0);
List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 1);
List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 2);
assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
for (TopicPartition p : parts1) {
// check that the element was actually contained
for (TopicPartition p : parts2) {
// check that the element was actually contained
for (TopicPartition p : parts3) {
// check that the element was actually contained
// all partitions must have been assigned
// grow the set of partitions and distribute anew
List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 0);
List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 1);
List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 2);
// new partitions must include all old partitions
assertTrue(parts1new.size() > parts1.size());
assertTrue(parts2new.size() > parts2.size());
assertTrue(parts3new.size() > parts3.size());
assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
for (TopicPartition p : parts1new) {
// check that the element was actually contained
for (TopicPartition p : parts2new) {
// check that the element was actually contained
for (TopicPartition p : parts3new) {
// check that the element was actually contained
// all partitions must have been assigned
catch (Exception e) {
private static boolean contains(int[] array, int value) {
for (int i : array) {
if (i == value) {
return true;
return false;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.junit.Ignore;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Properties;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class KafkaConsumerTest {
public void testValidateZooKeeperConfig() {
try {
// empty
Properties emptyProperties = new Properties();
try {
fail("should fail with an exception");
catch (IllegalArgumentException e) {
// expected
// no connect string (only group string)
Properties noConnect = new Properties();
noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
try {
fail("should fail with an exception");
catch (IllegalArgumentException e) {
// expected
// no group string (only connect string)
Properties noGroup = new Properties();
noGroup.put("zookeeper.connect", "localhost:47574");
try {
fail("should fail with an exception");
catch (IllegalArgumentException e) {
// expected
catch (Exception e) {
public void testSnapshot() {
try {
Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
LinkedMap map = new LinkedMap();
offsetsField.set(consumer, testOffsets);
runningField.set(consumer, true);
mapField.set(consumer, map);
// make multiple checkpoints
for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
assertArrayEquals(testOffsets, checkpoint);
// change the offsets, make sure the snapshot did not change
long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
for (int i = 0; i < testOffsets.length; i++) {
testOffsets[i] += 1L;
assertArrayEquals(checkpointCopy, checkpoint);
assertTrue(map.size() > 0);
assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
catch (Exception e) {
@Ignore("Kafka consumer internally makes an infinite loop")
public void testCreateSourceWithoutCluster() {
try {
Properties props = new Properties();
props.setProperty("zookeeper.connect", "localhost:56794");
props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
props.setProperty("group.id", "non-existent-group");
new FlinkKafkaConsumer<String>("no op topic", new JavaDefaultStringSchema(), props,
catch (Exception e) {
......@@ -30,6 +30,7 @@ public class KafkaLocalSystemTime implements Time {
return System.currentTimeMillis();
public long nanoseconds() {
return System.nanoTime();
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.testutils.SuccessException;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class KafkaProducerITCase extends KafkaTestBase {
* <pre>
* +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
* / | \
* / | \
* (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
* \ | /
* \ | /
* +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
* </pre>
* The mapper validates that the values come consistently from the correct Kafka partition.
* The final sink validates that there are no duplicates and that all partitions are present.
public void testCustomPartitioning() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
final String topic = "customPartitioningTestTopic";
final int parallelism = 3;
createTestTopic(topic, parallelism, 1);
TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
// ------ producing topology ---------
// source has DOP 1 to make sure it generates no duplicates
DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
private boolean running = true;
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
long cnt = 0;
while (running) {
ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
public void cancel() {
running = false;
// sink partitions into
stream.addSink(new KafkaSink<Tuple2<Long, String>>(
brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
// ------ consuming topology ---------
FlinkKafkaConsumer<Tuple2<Long, String>> source =
new FlinkKafkaConsumer<>(topic, deserSchema, standardProps,
// mapper that validates partitioning and maps to partition
.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
private int ourPartition = -1;
public Integer map(Tuple2<Long, String> value) {
int partition = value.f0.intValue() % parallelism;
if (ourPartition != -1) {
assertEquals("inconsistent partitioning", ourPartition, partition);
} else {
ourPartition = partition;
return partition;
.addSink(new SinkFunction<Integer>() {
private int[] valuesPerPartition = new int[parallelism];
public void invoke(Integer value) throws Exception {
boolean missing = false;
for (int i : valuesPerPartition) {
if (i < 100) {
missing = true;
if (!missing) {
throw new SuccessException();
tryExecute(env, "custom partitioning test");
LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
catch (Exception e) {
// ------------------------------------------------------------------------
public static class CustomPartitioner implements SerializableKafkaPartitioner {
private final int expectedPartitions;
public CustomPartitioner(int expectedPartitions) {
this.expectedPartitions = expectedPartitions;
public int partition(Object key, int numPartitions) {
Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
assertEquals(expectedPartitions, numPartitions);
return (int) (tuple.f0 % numPartitions);
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.internals;
import kafka.admin.AdminUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.flink.streaming.connectors.KafkaTestBase;
import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
public void runOffsetManipulationinZooKeeperTest() {
try {
final String topicName = "ZookeeperOffsetHandlerTest-Topic";
final String groupId = "ZookeeperOffsetHandlerTest-Group";
final long offset = (long) (Math.random() * Long.MAX_VALUE);
ZkClient zkClient = createZookeeperClient();
AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
assertEquals(offset, fetchedOffset);
catch (Exception e) {
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.testutils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.KafkaSink;
import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import java.util.Random;
public class DataGenerators {
public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
String brokerConnection, String topic,
int numPartitions,
final int from, final int to) throws Exception {
TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
private volatile boolean running = true;
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
int cnt = from;
int partition = getRuntimeContext().getIndexOfThisSubtask();
while (running && cnt <= to) {
ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
public void cancel() {
running = false;
stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
new Tuple2Partitioner(numPartitions)
env.execute("Data generator (Int, Int) stream to topic " + topic);
// ------------------------------------------------------------------------
public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
String brokerConnection, String topic,
final int numPartitions,
final int numElements,
final boolean randomizeOrder) throws Exception {
DataStream<Integer> stream = env.addSource(
new RichParallelSourceFunction<Integer>() {
private volatile boolean running = true;
public void run(SourceContext<Integer> ctx) {
// create a sequence
int[] elements = new int[numElements];
for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
i < numElements;
i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
elements[i] = val;
// scramble the sequence
if (randomizeOrder) {
Random rnd = new Random();
for (int i = 0; i < elements.length; i++) {
int otherPos = rnd.nextInt(elements.length);
int tmp = elements[i];
elements[i] = elements[otherPos];
elements[otherPos] = tmp;
// emit the sequence
int pos = 0;
while (running && pos < elements.length) {
public void cancel() {
running = false;
.addSink(new KafkaSink<>(brokerConnection, topic,
new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
new SerializableKafkaPartitioner() {
public int partition(Object key, int numPartitions) {
return ((Integer) key) % numPartitions;
env.execute("Scrambles int sequence generator");
// ------------------------------------------------------------------------
public static class InfiniteStringsGenerator extends Thread {
private final String kafkaConnectionString;
private final String topic;
private volatile Throwable error;
private volatile boolean running = true;
public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
this.kafkaConnectionString = kafkaConnectionString;
this.topic = topic;
public void run() {
// we manually feed data into the Kafka sink
KafkaSink<String> producer = null;
try {
producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
producer.open(new Configuration());
final StringBuilder bld = new StringBuilder();
final Random rnd = new Random();
while (running) {
int len = rnd.nextInt(100) + 1;
for (int i = 0; i < len; i++) {
bld.append((char) (rnd.nextInt(20) + 'a') );
String next = bld.toString();
catch (Throwable t) {
this.error = t;
finally {
if (producer != null) {
try {
catch (Throwable t) {
// ignore
public void shutdown() {
this.running = false;
public Throwable getError() {
return this.error;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.testutils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
* Sink function that discards data.
* @param <T> The type of the function.
public class DiscardingSink<T> implements SinkFunction<T> {
private static final long serialVersionUID = 2777597566520109843L;
public void invoke(T value) {}
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.testutils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
Checkpointed<Integer>, CheckpointNotifier, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
private static final long serialVersionUID = 6334389850158707313L;
public static volatile boolean failedBefore;
public static volatile boolean hasBeenCheckpointedBeforeFailure;
private final int failCount;
private int numElementsTotal;
private int numElementsThisTime;
private boolean failer;
private boolean hasBeenCheckpointed;
private Thread printer;
private volatile boolean printerRunning = true;
public FailingIdentityMapper(int failCount) {
this.failCount = failCount;
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
printer = new Thread(this, "FailingIdentityMapper Status Printer");
public T map(T value) throws Exception {
if (!failedBefore) {
if (failer && numElementsTotal >= failCount) {
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
failedBefore = true;
throw new Exception("Artificial Test Failure");
return value;
public void close() throws Exception {
printerRunning = false;
if (printer != null) {
printer = null;
public void notifyCheckpointComplete(long checkpointId) {
this.hasBeenCheckpointed = true;
public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
return numElementsTotal;
public void restoreState(Integer state) {
numElementsTotal = state;
public void run() {
while (printerRunning) {
try {
catch (InterruptedException e) {
// ignore
LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
numElementsThisTime, numElementsTotal);
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.testutils;
* Exception that is thrown to terminate a program and indicate success.
public class SuccessException extends Exception {
private static final long serialVersionUID = -7011865671593955887L;
......@@ -16,7 +16,7 @@
# limitations under the License.
log4j.rootLogger=INFO, testlogger
log4j.rootLogger=INFo, testlogger
log4j.appender.testlogger.target = System.err
......@@ -24,4 +24,6 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册