diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java index e7ac47438a565508e48fc141b230146ab4e6a319..6813c62b3e25930cae01bef2dcdfb64f148bd7af 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.java.DataSet; diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index 2d1d91a61a74759d1e678cd6717e225daad58128..7ff5e296c6dfd966c001903296973ccda86a2769 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -33,8 +33,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.util.serialization.DeserializationSchema; @@ -43,16 +45,12 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW import org.apache.flink.util.NetUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -124,7 +122,7 @@ import static com.google.common.base.Preconditions.checkNotNull; * reach the Kafka brokers or ZooKeeper.

*/ public class FlinkKafkaConsumer extends RichParallelSourceFunction - implements CheckpointNotifier, CheckpointedAsynchronously, ResultTypeQueryable { + implements CheckpointNotifier, CheckpointedAsynchronously>, ResultTypeQueryable { /** * The offset store defines how acknowledged offsets are committed back to Kafka. Different @@ -198,19 +196,17 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction /** The type of fetcher to be used to pull data from Kafka */ private final FetcherType fetcherType; - - /** name of the topic consumed by this source */ - private final String topic; + + /** List of partitions (including topics and leaders) to consume */ + private final List partitionInfos; /** The properties to parametrize the Kafka consumer and ZooKeeper client */ private final Properties props; - - /** The ids of the partitions that are read by this consumer */ - private final int[] partitions; - + /** The schema to convert between Kafka#s byte messages, and Flink's objects */ private final KeyedDeserializationSchema deserializer; + // ------ Runtime State ------- /** Data for pending but uncommitted checkpoints */ @@ -222,18 +218,18 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction /** The committer that persists the committed offsets */ private transient OffsetHandler offsetHandler; - /** The partitions actually handled by this consumer */ - private transient List subscribedPartitions; + /** The partitions actually handled by this consumer at runtime */ + private transient List subscribedPartitions; /** The offsets of the last returned elements */ - private transient long[] lastOffsets; + private transient HashMap lastOffsets; /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never * newer then the last offsets (Flink's internal view is fresher) */ - private transient long[] commitedOffsets; + private transient HashMap committedOffsets; /** The offsets to restore to, if the consumer restores state from a checkpoint */ - private transient long[] restoreToOffset; + private transient HashMap restoreToOffset; private volatile boolean running = true; @@ -257,7 +253,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction * @param fetcherType * The type of fetcher to use (new high-level API, old low-level API). */ - public FlinkKafkaConsumer(String topic, DeserializationSchema deserializer, Properties props, + public FlinkKafkaConsumer(List topic, DeserializationSchema deserializer, Properties props, OffsetStore offsetStore, FetcherType fetcherType) { this(topic, new KeyedDeserializationSchemaWrapper<>(deserializer), props, offsetStore, fetcherType); @@ -269,8 +265,8 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction *

To determine which kink of fetcher and offset handler to use, please refer to the docs * at the beginnign of this class.

* - * @param topic - * The Kafka topic to read from. + * @param topics + * The Kafka topics to read from. * @param deserializer * The deserializer to turn raw byte messages into Java/Scala objects. * @param props @@ -280,7 +276,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction * @param fetcherType * The type of fetcher to use (new high-level API, old low-level API). */ - public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema deserializer, Properties props, + public FlinkKafkaConsumer(List topics, KeyedDeserializationSchema deserializer, Properties props, OffsetStore offsetStore, FetcherType fetcherType) { this.offsetStore = checkNotNull(offsetStore); this.fetcherType = checkNotNull(fetcherType); @@ -294,7 +290,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction "The Kafka offset handler cannot be used together with the old low-level fetcher."); } - this.topic = checkNotNull(topic, "topic"); + checkNotNull(topics, "topics"); this.props = checkNotNull(props, "props"); this.deserializer = checkNotNull(deserializer, "valueDeserializer"); @@ -303,25 +299,31 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction validateZooKeeperConfig(props); } - // Connect to a broker to get the partitions - List partitionInfos = getPartitionsForTopic(topic, props); + // Connect to a broker to get the partitions for all topics + this.partitionInfos = getPartitionsForTopic(topics, props); if (partitionInfos.size() == 0) { - throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." + + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics.toString() + "." + "Please check previous log entries"); } - // get initial partitions list. The order of the partitions is important for consistent - // partition id assignment in restart cases. - this.partitions = new int[partitionInfos.size()]; - for (int i = 0; i < partitionInfos.size(); i++) { - partitions[i] = partitionInfos.get(i).partition(); - - if (partitions[i] >= partitions.length) { - throw new RuntimeException("Kafka partition numbers are sparse"); + if (LOG.isInfoEnabled()) { + Map countPerTopic = new HashMap<>(); + for (KafkaTopicPartitionLeader partition : partitionInfos) { + Integer count = countPerTopic.get(partition.getTopicPartition().getTopic()); + if (count == null) { + count = 1; + } else { + count++; + } + countPerTopic.put(partition.getTopicPartition().getTopic(), count); + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry e : countPerTopic.entrySet()) { + sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); } + LOG.info("Consumer is going to read the following topics (with number of partitions): ", sb.toString()); } - LOG.info("Topic {} has {} partitions", topic, partitions.length); } // ------------------------------------------------------------------------ @@ -333,19 +335,19 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction super.open(parameters); final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); - final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); + final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); // pick which partitions we work on - subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex); + subscribedPartitions = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); if (LOG.isInfoEnabled()) { LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", - thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions)); + thisConsumerIndex, KafkaTopicPartitionLeader.toString(subscribedPartitions), this.partitionInfos.size()); } // we leave the fetcher as null, if we have no partitions if (subscribedPartitions.isEmpty()) { - LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex); + LOG.info("Kafka consumer {} has no partitions (empty source)", thisConsumerIndex); return; } @@ -354,12 +356,11 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction case NEW_HIGH_LEVEL: throw new UnsupportedOperationException("Currently unsupported"); case LEGACY_LOW_LEVEL: - fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName()); + fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); break; default: throw new RuntimeException("Requested unknown fetcher " + fetcher); } - fetcher.setPartitionsToRead(subscribedPartitions); // offset handling switch (offsetStore){ @@ -372,34 +373,29 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction throw new RuntimeException("Requested unknown offset store " + offsetStore); } - // set up operator state - lastOffsets = new long[partitions.length]; - commitedOffsets = new long[partitions.length]; - - Arrays.fill(lastOffsets, OFFSET_NOT_SET); - Arrays.fill(commitedOffsets, OFFSET_NOT_SET); - + committedOffsets = new HashMap<>(); + // seek to last known pos, from restore request if (restoreToOffset != null) { if (LOG.isInfoEnabled()) { - LOG.info("Consumer {} found offsets from previous checkpoint: {}", - thisComsumerIndex, Arrays.toString(restoreToOffset)); + LOG.info("Consumer {} is restored from previous checkpoint: {}", + thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); } - for (int i = 0; i < restoreToOffset.length; i++) { - long restoredOffset = restoreToOffset[i]; - if (restoredOffset != OFFSET_NOT_SET) { - // if this fails because we are not subscribed to the topic, then the - // partition assignment is not deterministic! - - // we set the offset +1 here, because seek() is accepting the next offset to read, - // but the restore offset is the last read offset - fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1); - lastOffsets[i] = restoredOffset; - } + for (Map.Entry restorePartition: restoreToOffset.entrySet()) { + // seek fetcher to restore position + // we set the offset +1 here, because seek() is accepting the next offset to read, + // but the restore offset is the last read offset + fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1); } + // initialize offsets with restored state + this.lastOffsets = restoreToOffset; + restoreToOffset = null; } else { + // start with empty offsets + lastOffsets = new HashMap<>(); + // no restore request. Let the offset handler take care of the initial offset seeking offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher); } @@ -409,7 +405,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction public void run(SourceContext sourceContext) throws Exception { if (fetcher != null) { // For non-checkpointed sources, a thread which periodically commits the current offset into ZK. - PeriodicOffsetCommitter offsetCommitter = null; + PeriodicOffsetCommitter offsetCommitter = null; // check whether we need to start the periodic checkpoint committer StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); @@ -418,7 +414,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction // Note that the default configuration value in Kafka is 60 * 1000, so we use the // same here. long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000")); - offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); + offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this); offsetCommitter.setDaemon(true); offsetCommitter.start(); LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); @@ -504,7 +500,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction // ------------------------------------------------------------------------ @Override - public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public HashMap snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (lastOffsets == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); return null; @@ -516,10 +512,12 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", - Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp); + KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp); } - long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length); + // the use of clone() is okay here is okay, we just need a new map, the keys are not changed + //noinspection unchecked + HashMap currentOffsets = (HashMap) lastOffsets.clone(); // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() @@ -533,7 +531,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction } @Override - public void restoreState(long[] restoredOffsets) { + public void restoreState(HashMap restoredOffsets) { restoreToOffset = restoredOffsets; } @@ -554,7 +552,7 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction } try { - long[] checkpointOffsets; + HashMap checkpointOffsets; // the map may be asynchronously updates when snapshotting state, so we synchronize synchronized (pendingCheckpoints) { @@ -563,39 +561,21 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); return; } - - checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap); + + //noinspection unchecked + checkpointOffsets = (HashMap) pendingCheckpoints.remove(posInMap); + // remove older checkpoints in map for (int i = 0; i < posInMap; i++) { pendingCheckpoints.remove(0); } } - - if (LOG.isInfoEnabled()) { - LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore); - } - - // build the map of (topic,partition) -> committed offset - Map offsetsToCommit = new HashMap<>(); - for (TopicPartition tp : subscribedPartitions) { - - int partition = tp.partition(); - long offset = checkpointOffsets[partition]; - long lastCommitted = commitedOffsets[partition]; - - if (offset != OFFSET_NOT_SET) { - if (offset > lastCommitted) { - offsetsToCommit.put(tp, offset); - LOG.debug("Committing offset {} for partition {}", offset, partition); - } - else { - LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition); - } - } + if (checkpointOffsets == null || checkpointOffsets.size() == 0) { + LOG.info("Checkpoint state was empty."); + return; } - - offsetHandler.commit(offsetsToCommit); + commitOffsets(checkpointOffsets, this); } catch (Exception e) { if (running) { @@ -609,16 +589,15 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction // Miscellaneous utilities // ------------------------------------------------------------------------ - protected static List assignPartitions(int[] partitions, String topicName, - int numConsumers, int consumerIndex) { + protected static List assignPartitions(List partitions, int numConsumers, int consumerIndex) { checkArgument(numConsumers > 0); checkArgument(consumerIndex < numConsumers); - List partitionsToSub = new ArrayList<>(); + List partitionsToSub = new ArrayList<>(); - for (int i = 0; i < partitions.length; i++) { + for (int i = 0; i < partitions.size(); i++) { if (i % numConsumers == consumerIndex) { - partitionsToSub.add(new TopicPartition(topicName, partitions[i])); + partitionsToSub.add(partitions.get(i)); } } return partitionsToSub; @@ -627,12 +606,12 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction /** * Thread to periodically commit the current read offset into Zookeeper. */ - private static class PeriodicOffsetCommitter extends Thread { + private static class PeriodicOffsetCommitter extends Thread { private final long commitInterval; - private final FlinkKafkaConsumer consumer; + private final FlinkKafkaConsumer consumer; private volatile boolean running = true; - public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) { + public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) { this.commitInterval = commitInterval; this.consumer = consumer; } @@ -640,31 +619,15 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction @Override public void run() { try { + while (running) { try { Thread.sleep(commitInterval); // ------------ commit current offsets ---------------- // create copy of current offsets - long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length); - - Map offsetsToCommit = new HashMap<>(); - for (TopicPartition tp : (List)consumer.subscribedPartitions) { - int partition = tp.partition(); - long offset = currentOffsets[partition]; - long lastCommitted = consumer.commitedOffsets[partition]; - - if (offset != OFFSET_NOT_SET) { - if (offset > lastCommitted) { - offsetsToCommit.put(tp, offset); - LOG.debug("Committing offset {} for partition {}", offset, partition); - } else { - LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition); - } - } - } - - consumer.offsetHandler.commit(offsetsToCommit); + HashMap currentOffsets = (HashMap) consumer.lastOffsets.clone(); + commitOffsets(currentOffsets, this.consumer); } catch (InterruptedException e) { if (running) { // throw unexpected interruption @@ -686,6 +649,40 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction } } + + /** + * Utility method to commit offsets. + * + * @param toCommit the offsets to commit + * @param consumer consumer reference + * @param message type + * @throws Exception + */ + private static void commitOffsets(HashMap toCommit, FlinkKafkaConsumer consumer) throws Exception { + Map offsetsToCommit = new HashMap<>(); + for (KafkaTopicPartitionLeader tp : consumer.subscribedPartitions) { + long offset = toCommit.get(tp.getTopicPartition()); + Long lastCommitted = consumer.committedOffsets.get(tp.getTopicPartition()); + if (lastCommitted == null) { + lastCommitted = OFFSET_NOT_SET; + } + if (offset != OFFSET_NOT_SET) { + if (offset > lastCommitted) { + offsetsToCommit.put(tp.getTopicPartition(), offset); + consumer.committedOffsets.put(tp.getTopicPartition(), offset); + LOG.debug("Committing offset {} for partition {}", offset, tp.getTopicPartition()); + } else { + LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp.getTopicPartition()); + } + } + } + + if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) { + LOG.debug("Committing offsets {} to offset store: {}", KafkaTopicPartition.toString(offsetsToCommit), consumer.offsetStore); + } + + consumer.offsetHandler.commit(offsetsToCommit); + } // ------------------------------------------------------------------------ // Kafka / ZooKeeper communication utilities @@ -694,19 +691,19 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction /** * Send request to Kafka to get partitions for topic. * - * @param topic The name of the topic. + * @param topics The name of the topics. * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. */ - public static List getPartitionsForTopic(final String topic, final Properties properties) { + public static List getPartitionsForTopic(final List topics, final Properties properties) { String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES))); checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set"); String[] seedBrokers = seedBrokersConfString.split(","); - List partitions = new ArrayList<>(); + List partitions = new ArrayList<>(); Random rnd = new Random(); - retryLoop: for(int retry = 0; retry < numRetries; retry++) { + retryLoop: for (int retry = 0; retry < numRetries; retry++) { // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the // parallel source instances start. Still, we try all available brokers. int index = rnd.nextInt(seedBrokers.length); @@ -725,7 +722,6 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536")); consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); - List topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); @@ -740,32 +736,24 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode())); } // warn and try more brokers - LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic, - ErrorMapping.exceptionFor(item.errorCode())); + LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " + + "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage()); continue brokersLoop; } - if (!item.topic().equals(topic)) { + if (!topics.contains(item.topic())) { LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ..."); continue brokersLoop; } for (PartitionMetadata part : item.partitionsMetadata()) { Node leader = brokerToNode(part.leader()); - Node[] replicas = new Node[part.replicas().size()]; - for (int i = 0; i < part.replicas().size(); i++) { - replicas[i] = brokerToNode(part.replicas().get(i)); - } - - Node[] ISRs = new Node[part.isr().size()]; - for (int i = 0; i < part.isr().size(); i++) { - ISRs[i] = brokerToNode(part.isr().get(i)); - } - PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs); + KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId()); + KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader); partitions.add(pInfo); } } break retryLoop; // leave the loop through the brokers } catch (Exception e) { - LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e); + LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString(), e); } finally { if (consumer != null) { consumer.close(); diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java index 21f24e60c6d911a378f109f42a46f6fd2e324ada..abe33aa58abfcc302cbec41460d737bfd661e19c 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import java.util.Collections; import java.util.Properties; /** @@ -52,6 +53,6 @@ public class FlinkKafkaConsumer081 extends FlinkKafkaConsumer { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ public FlinkKafkaConsumer081(String topic, DeserializationSchema valueDeserializer, Properties props) { - super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); + super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); } } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java index ab4a88aecba777e10b0c18dd98dc9c6a4bd3bf8d..adc42de9f752438df8f6b5472dce3933983c71d7 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import java.util.Collections; +import java.util.List; import java.util.Properties; /** @@ -47,9 +49,12 @@ public class FlinkKafkaConsumer082 extends FlinkKafkaConsumer { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ public FlinkKafkaConsumer082(String topic, DeserializationSchema valueDeserializer, Properties props) { - super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); + super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); } + + //----- key-value deserializer constructor + /** * Creates a new Kafka 0.8.2.x streaming source consumer. * @@ -64,6 +69,17 @@ public class FlinkKafkaConsumer082 extends FlinkKafkaConsumer { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema deserializer, Properties props) { - super(topic, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); + super(Collections.singletonList(topic), deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); + } + + //----- topic list constructors + + + public FlinkKafkaConsumer082(List topics, DeserializationSchema valueDeserializer, Properties props) { + super(topics, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); + } + + public FlinkKafkaConsumer082(List topics, KeyedDeserializationSchema deserializer, Properties props) { + super(topics, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); } } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index a8d913bc457dffd857426c17d4cbd1bf623c2948..7e01b54f42c50f8d47c9d5f707c423e5bdd0cdf7 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -198,13 +198,13 @@ public class FlinkKafkaProducer extends RichSinkFunction { // set the producer configuration properties. - if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); } else { LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); } - if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); } else { LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java deleted file mode 100644 index afa2e428b5e149f8e269e78bb6e917595b96d13e..0000000000000000000000000000000000000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.api; - - -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -/** - * Sink that emits its inputs to a Kafka topic. - * - * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink. - * This class will be removed in future releases of Flink. - * - * @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead. - */ -@Deprecated -public class KafkaSink extends FlinkKafkaProducer { - public KafkaSink(String brokerList, String topicId, SerializationSchema serializationSchema) { - super(brokerList, topicId, serializationSchema); - } -} diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java deleted file mode 100644 index 2efeb20027c972f7e44204cc604dd734b18c0eba..0000000000000000000000000000000000000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.persistent; - -import kafka.consumer.ConsumerConfig; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - - -/** - * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers. - * - * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations. - * - * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082. - * - * @param The type of elements produced by this consumer. - * - * @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the - * Kafka version specific consumers, like - * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081}, - * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc. - */ -@Deprecated -public class PersistentKafkaSource extends FlinkKafkaConsumer { - - private static final long serialVersionUID = -8450689820627198228L; - - /** - * Creates a new Kafka 0.8.2.x streaming source consumer. - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param consumerConfig - * The consumer config used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public PersistentKafkaSource(String topic, DeserializationSchema valueDeserializer, ConsumerConfig consumerConfig) { - super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } -} diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java index dabafa97d53155e6b8d3e39a8e2c6c0d3d0dc5db..4f1a2a69978466abff14d54ac7e645ca4e35ac85 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java @@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.internals; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.kafka.common.TopicPartition; import java.io.IOException; -import java.util.List; +import java.util.HashMap; /** * A fetcher pulls data from Kafka, from a fix set of partitions. @@ -30,16 +29,9 @@ import java.util.List; */ public interface Fetcher { - /** - * Set which partitions the fetcher should pull from. - * - * @param partitions The list of partitions for a topic that the fetcher will pull from. - */ - void setPartitionsToRead(List partitions); - /** * Closes the fetcher. This will stop any operation in the - * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually + * {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually * close underlying connections and release all resources. */ void close() throws IOException; @@ -61,15 +53,14 @@ public interface Fetcher { * } * } * } - * + * + * @param The type of elements produced by the fetcher and emitted to the source context. * @param sourceContext The source context to emit elements to. * @param valueDeserializer The deserializer to decode the raw values with. - * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state) - * - * @param The type of elements produced by the fetcher and emitted to the source context. + * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state) */ void run(SourceFunction.SourceContext sourceContext, KeyedDeserializationSchema valueDeserializer, - long[] lastOffsets) throws Exception; + HashMap lastOffsets) throws Exception; /** * Set the next offset to read from for the given partition. @@ -79,7 +70,7 @@ public interface Fetcher { * @param topicPartition The partition for which to seek the offset. * @param offsetToRead To offset to seek to. */ - void seek(TopicPartition topicPartition, long offsetToRead); + void seek(KafkaTopicPartition topicPartition, long offsetToRead); /** * Exit run loop with given error and release all resources. diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java new file mode 100644 index 0000000000000000000000000000000000000000..f269aa32d34f74f23e47d884e5f969148c0d6817 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a kafka topic and a partition. + * Used as an operator state for the Kafka consumer + */ +public class KafkaTopicPartition implements Serializable { + + private static final long serialVersionUID = 722083576322742325L; + + private final String topic; + private final int partition; + private final int cachedHash; + + public KafkaTopicPartition(String topic, int partition) { + this.topic = checkNotNull(topic); + this.partition = partition; + this.cachedHash = 31 * topic.hashCode() + partition; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + @Override + public String toString() { + return "KafkaTopicPartition{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicPartition)) { + return false; + } + + KafkaTopicPartition that = (KafkaTopicPartition) o; + + if (partition != that.partition) { + return false; + } + return topic.equals(that.topic); + } + + @Override + public int hashCode() { + return cachedHash; + } + + + // ------------------- Utilities ------------------------------------- + + /** + * Returns a unique list of topics from the topic partition map + * + * @param topicPartitionMap A map of KafkaTopicPartition's + * @return A unique list of topics from the input map + */ + public static List getTopics(Map topicPartitionMap) { + HashSet uniqueTopics = new HashSet<>(); + for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) { + uniqueTopics.add(ktp.getTopic()); + } + return new ArrayList<>(uniqueTopics); + } + + public static String toString(Map map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry p: map.entrySet()) { + KafkaTopicPartition ktp = p.getKey(); + sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", "); + } + return sb.toString(); + } + + /** + * Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders + * + * @param map The map of KafkaTopicPartitionLeaders + * @return true if the element is contained. + */ + public boolean isContained(Map map) { + for(Map.Entry entry : map.entrySet()) { + if(entry.getKey().getTopicPartition().equals(this)) { + return true; + } + } + return false; + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java new file mode 100644 index 0000000000000000000000000000000000000000..8dd9a52e193aca2894684e227b7918d591063748 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.kafka.common.Node; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * Serializable Topic Partition info with leader Node information. + * This class is used at runtime. + */ +public class KafkaTopicPartitionLeader implements Serializable { + + private static final long serialVersionUID = 9145855900303748582L; + + private final int leaderId; + private final int leaderPort; + private final String leaderHost; + private final KafkaTopicPartition topicPartition; + private final int cachedHash; + + public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) { + this.topicPartition = topicPartition; + if (leader == null) { + this.leaderId = -1; + this.leaderHost = null; + this.leaderPort = -1; + } else { + this.leaderId = leader.id(); + this.leaderPort = leader.port(); + this.leaderHost = leader.host(); + } + int cachedHash = (leader == null) ? 14 : leader.hashCode(); + this.cachedHash = 31 * cachedHash + topicPartition.hashCode(); + } + + public KafkaTopicPartition getTopicPartition() { + return topicPartition; + } + + public Node getLeader() { + if (this.leaderId == -1) { + return null; + } else { + return new Node(leaderId, leaderHost, leaderPort); + } + } + + public static Object toString(List partitions) { + StringBuilder sb = new StringBuilder(); + for (KafkaTopicPartitionLeader p: partitions) { + sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", "); + } + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicPartitionLeader)) { + return false; + } + + KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o; + + if (!topicPartition.equals(that.topicPartition)) { + return false; + } + return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost); + } + + @Override + public int hashCode() { + return cachedHash; + } + + @Override + public String toString() { + return "KafkaTopicPartitionLeader{" + + "leaderId=" + leaderId + + ", leaderPort=" + leaderPort + + ", leaderHost='" + leaderHost + '\'' + + ", topic=" + topicPartition.getTopic() + + ", partition=" + topicPartition.getPartition() + + '}'; + } + + + /** + * Replaces an existing KafkaTopicPartition ignoring the leader in the given map. + * + * @param newKey new topicpartition + * @param newValue new offset + * @param map map to do the search in + * @return oldValue the old value (offset) + */ + public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map map) { + for(Map.Entry entry: map.entrySet()) { + if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) { + Long oldValue = map.remove(entry.getKey()); + if(map.put(newKey, newValue) != null) { + throw new IllegalStateException("Key was not removed before"); + } + return oldValue; + } + } + return null; + } +} diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index 95683ce2c1bb4d69c8bd43157755c7aab3de88c7..4233c18d60ffe3f5ae5039a253e4be9929aa2974 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -31,18 +31,17 @@ import kafka.message.MessageAndOffset; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.StringUtils; +import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,16 +52,14 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * This fetcher uses Kafka's low-level API to pull data from a specific - * set of partitions and offsets for a certain topic. + * set of topics and partitions. * *

This code is in parts based on the tutorial code for the low-level Kafka consumer.

*/ public class LegacyFetcher implements Fetcher { - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class); - /** The topic from which this fetcher pulls data */ - private final String topic; /** The properties that configure the Kafka connection */ private final Properties config; @@ -74,7 +71,13 @@ public class LegacyFetcher implements Fetcher { private final AtomicReference error; /** The partitions that the fetcher should read, with their starting offsets */ - private Map partitionsToRead; + private Map partitionsToRead; + + /** The seek() method might receive KafkaTopicPartition's without leader information + * (for example when restoring). + * If there are elements in this list, we'll fetch the leader from Kafka. + **/ + private Map partitionsToReadWithoutLeader; /** Reference the the thread that executed the run() method. */ private volatile Thread mainThread; @@ -82,9 +85,13 @@ public class LegacyFetcher implements Fetcher { /** Flag to shot the fetcher down */ private volatile boolean running = true; - public LegacyFetcher(String topic, Properties props, String taskName) { + public LegacyFetcher(List partitions, Properties props, String taskName) { this.config = checkNotNull(props, "The config properties cannot be null"); - this.topic = checkNotNull(topic, "The topic cannot be null"); + //this.topic = checkNotNull(topic, "The topic cannot be null"); + this.partitionsToRead = new HashMap<>(); + for (KafkaTopicPartitionLeader p: partitions) { + partitionsToRead.put(p, FlinkKafkaConsumer.OFFSET_NOT_SET); + } this.taskName = taskName; this.error = new AtomicReference<>(); } @@ -94,23 +101,18 @@ public class LegacyFetcher implements Fetcher { // ------------------------------------------------------------------------ @Override - public void setPartitionsToRead(List partitions) { - partitionsToRead = new HashMap<>(partitions.size()); - for (TopicPartition tp: partitions) { - partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET); - } - } - - @Override - public void seek(TopicPartition topicPartition, long offsetToRead) { + public void seek(KafkaTopicPartition topicPartition, long offsetToRead) { if (partitionsToRead == null) { throw new IllegalArgumentException("No partitions to read set"); } - if (!partitionsToRead.containsKey(topicPartition)) { + if (!topicPartition.isContained(partitionsToRead)) { throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition + ") we are not going to read. Partitions to read " + partitionsToRead); } - partitionsToRead.put(topicPartition, offsetToRead); + if (partitionsToReadWithoutLeader == null) { + partitionsToReadWithoutLeader = new HashMap<>(); + } + partitionsToReadWithoutLeader.put(topicPartition, offsetToRead); } @Override @@ -124,7 +126,7 @@ public class LegacyFetcher implements Fetcher { @Override public void run(SourceFunction.SourceContext sourceContext, KeyedDeserializationSchema deserializer, - long[] lastOffsets) throws Exception { + HashMap lastOffsets) throws Exception { if (partitionsToRead == null || partitionsToRead.size() == 0) { throw new IllegalArgumentException("No partitions set"); @@ -135,54 +137,57 @@ public class LegacyFetcher implements Fetcher { this.mainThread = Thread.currentThread(); LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher"); - - // get lead broker for each partition - - // NOTE: The kafka client apparently locks itself in an infinite loop sometimes - // when it is interrupted, so we run it only in a separate thread. - // since it sometimes refuses to shut down, we resort to the admittedly harsh - // means of killing the thread after a timeout. - PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config); - infoFetcher.start(); - - KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); - watchDog.start(); - - final List allPartitionsInTopic = infoFetcher.getPartitions(); - - // brokers to fetch partitions from. - int fetchPartitionsCount = 0; - Map> fetchBrokers = new HashMap<>(); - - for (PartitionInfo partitionInfo : allPartitionsInTopic) { - if (partitionInfo.leader() == null) { - throw new RuntimeException("Unable to consume partition " + partitionInfo.partition() - + " from topic "+partitionInfo.topic()+" because it does not have a leader"); - } - - for (Map.Entry entry : partitionsToRead.entrySet()) { - final TopicPartition topicPartition = entry.getKey(); - final long offset = entry.getValue(); - - // check if that partition is for us - if (topicPartition.partition() == partitionInfo.partition()) { - List partitions = fetchBrokers.get(partitionInfo.leader()); - if (partitions == null) { - partitions = new ArrayList<>(); - fetchBrokers.put(partitionInfo.leader(), partitions); + + // get lead broker if necessary + if (partitionsToReadWithoutLeader != null && partitionsToReadWithoutLeader.size() > 0) { + LOG.info("Refreshing leader information for partitions {}", KafkaTopicPartition.toString(partitionsToReadWithoutLeader)); + // NOTE: The kafka client apparently locks itself in an infinite loop sometimes + // when it is interrupted, so we run it only in a separate thread. + // since it sometimes refuses to shut down, we resort to the admittedly harsh + // means of killing the thread after a timeout. + PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(KafkaTopicPartition.getTopics(partitionsToReadWithoutLeader), config); + infoFetcher.start(); + + KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); + watchDog.start(); + + List topicPartitionWithLeaderList = infoFetcher.getPartitions(); + + // replace potentially outdated leader information in partitionsToRead with fresh data from topicPartitionWithLeader + for (Map.Entry pt: partitionsToReadWithoutLeader.entrySet()) { + KafkaTopicPartitionLeader topicPartitionWithLeader = null; + // go through list + for (KafkaTopicPartitionLeader withLeader: topicPartitionWithLeaderList) { + if (withLeader.getTopicPartition().equals(pt.getKey())) { + topicPartitionWithLeader = withLeader; + break; } - - partitions.add(new FetchPartition(topicPartition.partition(), offset)); - fetchPartitionsCount++; - } - // else this partition is not for us + if (topicPartitionWithLeader == null) { + throw new IllegalStateException("Unable to find topic/partition leader information"); + } + Long removed = KafkaTopicPartitionLeader.replaceIgnoringLeader(topicPartitionWithLeader, pt.getValue(), partitionsToRead); + if (removed == null) { + throw new IllegalStateException("Seek request on unknown topic partition"); + } } } - - if (partitionsToRead.size() != fetchPartitionsCount) { - throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only " - + fetchPartitionsCount + " partition infos with lead brokers."); + + + // build a map for each broker with its partitions + Map> fetchBrokers = new HashMap<>(); + + for (Map.Entry entry : partitionsToRead.entrySet()) { + final KafkaTopicPartitionLeader topicPartition = entry.getKey(); + final long offset = entry.getValue(); + + List partitions = fetchBrokers.get(topicPartition.getLeader()); + if (partitions == null) { + partitions = new ArrayList<>(); + fetchBrokers.put(topicPartition.getLeader(), partitions); + } + + partitions.add(new FetchPartition(topicPartition.getTopicPartition().getTopic(), topicPartition.getTopicPartition().getPartition(), offset)); } // create SimpleConsumers for each broker @@ -194,7 +199,7 @@ public class LegacyFetcher implements Fetcher { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); - SimpleConsumerThread thread = new SimpleConsumerThread<>(this, config, topic, + SimpleConsumerThread thread = new SimpleConsumerThread<>(this, config, broker, partitions, sourceContext, deserializer, lastOffsets); thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", @@ -274,21 +279,24 @@ public class LegacyFetcher implements Fetcher { * Representation of a partition to fetch. */ private static class FetchPartition { + + final String topic; /** ID of the partition within the topic (0 indexed, as given by Kafka) */ - int partition; + final int partition; /** Offset pointing at the next element to read from that partition. */ long nextOffsetToRead; - FetchPartition(int partition, long nextOffsetToRead) { + FetchPartition(String topic, int partition, long nextOffsetToRead) { + this.topic = topic; this.partition = partition; this.nextOffsetToRead = nextOffsetToRead; } @Override public String toString() { - return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}'; + return "FetchPartition {topic=" + topic +", partition=" + partition + ", offset=" + nextOffsetToRead + '}'; } } @@ -306,12 +314,12 @@ public class LegacyFetcher implements Fetcher { private final SourceFunction.SourceContext sourceContext; private final KeyedDeserializationSchema deserializer; - private final long[] offsetsState; + private final HashMap offsetsState; private final FetchPartition[] partitions; private final Node broker; - private final String topic; + private final Properties config; private final LegacyFetcher owner; @@ -323,15 +331,14 @@ public class LegacyFetcher implements Fetcher { // exceptions are thrown locally public SimpleConsumerThread(LegacyFetcher owner, - Properties config, String topic, + Properties config, Node broker, FetchPartition[] partitions, SourceFunction.SourceContext sourceContext, KeyedDeserializationSchema deserializer, - long[] offsetsState) { + HashMap offsetsState) { this.owner = owner; this.config = config; - this.topic = topic; this.broker = broker; this.partitions = partitions; this.sourceContext = checkNotNull(sourceContext); @@ -341,6 +348,7 @@ public class LegacyFetcher implements Fetcher { @Override public void run() { + LOG.info("Starting to fetch from {}", Arrays.toString(this.partitions)); try { // set up the config values final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); @@ -368,14 +376,13 @@ public class LegacyFetcher implements Fetcher { } } if (partitionsToGetOffsetsFor.size() > 0) { - getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); - LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}", - topic, partitionsToGetOffsetsFor); + getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); + LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor); } } // Now, the actual work starts :-) - int OffsetOutOfRangeCount = 0; + int offsetOutOfRangeCount = 0; while (running) { FetchRequestBuilder frb = new FetchRequestBuilder(); frb.clientId(clientId); @@ -383,38 +390,37 @@ public class LegacyFetcher implements Fetcher { frb.minBytes(minBytes); for (FetchPartition fp : partitions) { - frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize); + frb.addFetch(fp.topic, fp.partition, fp.nextOffsetToRead, fetchSize); } kafka.api.FetchRequest fetchRequest = frb.build(); LOG.debug("Issuing fetch request {}", fetchRequest); - FetchResponse fetchResponse; - fetchResponse = consumer.fetch(fetchRequest); + FetchResponse fetchResponse = consumer.fetch(fetchRequest); if (fetchResponse.hasError()) { String exception = ""; List partitionsToGetOffsetsFor = new ArrayList<>(); for (FetchPartition fp : partitions) { - short code = fetchResponse.errorCode(topic, fp.partition); + short code = fetchResponse.errorCode(fp.topic, fp.partition); - if(code == ErrorMapping.OffsetOutOfRangeCode()) { + if (code == ErrorMapping.OffsetOutOfRangeCode()) { // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' partitionsToGetOffsetsFor.add(fp); - } else if(code != ErrorMapping.NoError()) { + } else if (code != ErrorMapping.NoError()) { exception += "\nException for partition " + fp.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); } } if (partitionsToGetOffsetsFor.size() > 0) { // safeguard against an infinite loop. - if(OffsetOutOfRangeCount++ > 0) { + if (offsetOutOfRangeCount++ > 0) { throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " + "Exceptions: "+exception); } // get valid offsets for these partitions and try again. LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); - getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); + getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); continue; // jump back to create a new fetch request. The offset has not been touched. } else { @@ -424,9 +430,10 @@ public class LegacyFetcher implements Fetcher { } int messagesInFetch = 0; + int deletedMessages = 0; for (FetchPartition fp : partitions) { - final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition); - final int partition = fp.partition; + final ByteBufferMessageSet messageSet = fetchResponse.messageSet(fp.topic, fp.partition); + final KafkaTopicPartition topicPartition = new KafkaTopicPartition(fp.topic, fp.partition); for (MessageAndOffset msg : messageSet) { if (running) { @@ -439,8 +446,19 @@ public class LegacyFetcher implements Fetcher { continue; } + final long offset = msg.offset(); + // put value into byte array ByteBuffer payload = msg.message().payload(); + if (payload == null) { + // This message has no value (which means it has been deleted from the Kafka topic) + deletedMessages++; + // advance offset in state to avoid re-reading the message + synchronized (sourceContext.getCheckpointLock()) { + offsetsState.put(topicPartition, offset); + } + continue; + } byte[] valueBytes = new byte[payload.remaining()]; payload.get(valueBytes); @@ -454,12 +472,10 @@ public class LegacyFetcher implements Fetcher { keyPayload.get(keyBytes); } - final long offset = msg.offset(); - final T value = deserializer.deserialize(keyBytes, valueBytes, offset); - + final T value = deserializer.deserialize(keyBytes, valueBytes, fp.topic, offset); synchronized (sourceContext.getCheckpointLock()) { sourceContext.collect(value); - offsetsState[partition] = offset; + offsetsState.put(topicPartition, offset); } // advance offset for the next request @@ -471,7 +487,7 @@ public class LegacyFetcher implements Fetcher { } } } - LOG.debug("This fetch contained {} messages", messagesInFetch); + LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); } } catch (Throwable t) { @@ -510,15 +526,14 @@ public class LegacyFetcher implements Fetcher { * Request latest offsets for a set of partitions, via a Kafka consumer. * * @param consumer The consumer connected to lead broker - * @param topic The topic name * @param partitions The list of partitions we need offsets for * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) */ - private static void getLastOffset(SimpleConsumer consumer, String topic, List partitions, long whichTime) { + private static void getLastOffset(SimpleConsumer consumer, List partitions, long whichTime) { Map requestInfo = new HashMap<>(); for (FetchPartition fp: partitions) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition); + TopicAndPartition topicAndPartition = new TopicAndPartition(fp.topic, fp.partition); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); } @@ -529,18 +544,17 @@ public class LegacyFetcher implements Fetcher { String exception = ""; for (FetchPartition fp: partitions) { short code; - if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) { + if ( (code=response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) { exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); } } - throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions - + ". " + exception); + throw new RuntimeException("Unable to get last offset for partitions " + partitions + ". " + exception); } for (FetchPartition fp: partitions) { // the resulting offset is the next offset we are going to read // for not-yet-consumed partitions, it is 0. - fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0]; + fp.nextOffsetToRead = response.offsets(fp.topic, fp.partition)[0]; } } @@ -554,41 +568,42 @@ public class LegacyFetcher implements Fetcher { return timeType; } } - + + private static class PartitionInfoFetcher extends Thread { - private final String topic; + private final List topics; private final Properties properties; - - private volatile List result; + + private volatile List result; private volatile Throwable error; - - PartitionInfoFetcher(String topic, Properties properties) { - this.topic = topic; + + PartitionInfoFetcher(List topics, Properties properties) { + this.topics = topics; this.properties = properties; } @Override public void run() { try { - result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties); + result = FlinkKafkaConsumer.getPartitionsForTopic(topics, properties); } catch (Throwable t) { this.error = t; } } - - public List getPartitions() throws Exception { + + public List getPartitions() throws Exception { try { this.join(); } catch (InterruptedException e) { throw new Exception("Partition fetching was cancelled before completion"); } - + if (error != null) { - throw new Exception("Failed to fetch partitions for topic " + topic, error); + throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); } if (result != null) { return result; @@ -598,14 +613,14 @@ public class LegacyFetcher implements Fetcher { } private static class KillerWatchDog extends Thread { - + private final Thread toKill; private final long timeout; private KillerWatchDog(Thread toKill, long timeout) { super("KillerWatchDog"); setDaemon(true); - + this.toKill = toKill; this.timeout = timeout; } @@ -615,7 +630,7 @@ public class LegacyFetcher implements Fetcher { public void run() { final long deadline = System.currentTimeMillis() + timeout; long now; - + while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { try { toKill.join(deadline - now); @@ -624,7 +639,7 @@ public class LegacyFetcher implements Fetcher { // ignore here, our job is important! } } - + // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java index 2a825616c6cc4772913d2ff90d11a43c04bdafbe..fdd89c675e0ca8378862ee3cb1990c0f6f5907b2 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java @@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka.internals; -import org.apache.kafka.common.TopicPartition; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -38,7 +36,7 @@ public interface OffsetHandler { * * @param offsetsToCommit The offset to commit, per partition. */ - void commit(Map offsetsToCommit) throws Exception; + void commit(Map offsetsToCommit) throws Exception; /** * Positions the given fetcher to the initial read offsets where the stream consumption @@ -47,7 +45,7 @@ public interface OffsetHandler { * @param partitions The partitions for which to seeks the fetcher to the beginning. * @param fetcher The fetcher that will pull data from Kafka and must be positioned. */ - void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) throws Exception; + void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) throws Exception; /** * Closes the offset handler, releasing all resources. diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index 42a5951d7fce65c4a57845b738ebdcb947ebe928..f9b8448a4e54765ac074a0c2a8a8cfb23ee187ce 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -25,7 +25,6 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.TopicPartition; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -71,28 +70,28 @@ public class ZookeeperOffsetHandler implements OffsetHandler { @Override - public void commit(Map offsetsToCommit) { - for (Map.Entry entry : offsetsToCommit.entrySet()) { - TopicPartition tp = entry.getKey(); + public void commit(Map offsetsToCommit) { + for (Map.Entry entry : offsetsToCommit.entrySet()) { + KafkaTopicPartition tp = entry.getKey(); long offset = entry.getValue(); if (offset >= 0) { - setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset); + setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset); } } } @Override - public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) { - for (TopicPartition tp : partitions) { - long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition()); + public void seekFetcherToInitialOffsets(List partitions, Fetcher fetcher) { + for (KafkaTopicPartitionLeader tp : partitions) { + long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition()); if (offset != OFFSET_NOT_SET) { LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.", - tp.partition(), offset); + tp.getTopicPartition().getPartition(), offset); // the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset. - fetcher.seek(tp, offset + 1); + fetcher.seek(tp.getTopicPartition(), offset + 1); } } } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java index 7ab72909dbfaa2256a084e32285196a50d4af4d0..61735e98d958789afa83502114aa8c65049436a2 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -59,12 +59,12 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable { @Override public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { int p = 0; - for(int i = 0; i < parallelInstances; i++) { - if(i == parallelInstanceId) { + for (int i = 0; i < parallelInstances; i++) { + if (i == parallelInstanceId) { targetPartition = partitions[p]; return; } - if(++p == partitions.length) { + if (++p == partitions.length) { p = 0; } } @@ -72,7 +72,7 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable { @Override public int partition(Object element, int numPartitions) { - if(targetPartition == -1) { + if (targetPartition == -1) { throw new RuntimeException("The partitioner has not been initialized properly"); } return targetPartition; diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index 3d392aa16819f1ebc1bcd7325165cfd24db63d22..c4b026b4747f0d875c7b64081e124c9cc20fe4b9 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.kafka.common.TopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.kafka.common.Node; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -30,23 +32,30 @@ import java.util.Set; import static org.junit.Assert.*; + /** * Tests that the partition assignment is deterministic and stable. */ public class KafkaConsumerPartitionAssignmentTest { + private final Node fake = new Node(1337, "localhost", 1337); + @Test public void testPartitionsEqualConsumers() { try { - int[] partitions = {4, 52, 17, 1}; - - for (int i = 0; i < partitions.length; i++) { - List parts = FlinkKafkaConsumer.assignPartitions( - partitions, "test-topic", partitions.length, i); - + List inPartitions = new ArrayList<>(); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); + + for (int i = 0; i < inPartitions.size(); i++) { + List parts = FlinkKafkaConsumer.assignPartitions( + inPartitions, inPartitions.size(), i); + assertNotNull(parts); assertEquals(1, parts.size()); - assertTrue(contains(partitions, parts.get(0).partition())); + assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition())); } } catch (Exception e) { @@ -55,31 +64,43 @@ public class KafkaConsumerPartitionAssignmentTest { } } + private boolean contains(List inPartitions, int partition) { + for (KafkaTopicPartitionLeader ktp: inPartitions) { + if (ktp.getTopicPartition().getPartition() == partition) { + return true; + } + } + return false; + } + @Test public void testMultiplePartitionsPerConsumers() { try { - final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + + final List partitions = new ArrayList<>(); + final Set allPartitions = new HashSet<>(); - final Set allPartitions = new HashSet<>(); - for (int i : partitions) { - allPartitions.add(i); + for (int p : partitionIDs) { + KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + partitions.add(part); + allPartitions.add(part); } - + final int numConsumers = 3; - final int minPartitionsPerConsumer = partitions.length / numConsumers; - final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1; - + final int minPartitionsPerConsumer = partitions.size() / numConsumers; + final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; + for (int i = 0; i < numConsumers; i++) { - List parts = FlinkKafkaConsumer.assignPartitions( - partitions, "test-topic", numConsumers, i); + List parts = FlinkKafkaConsumer.assignPartitions(partitions, numConsumers, i); assertNotNull(parts); assertTrue(parts.size() >= minPartitionsPerConsumer); assertTrue(parts.size() <= maxPartitionsPerConsumer); - for (TopicPartition p : parts) { + for (KafkaTopicPartitionLeader p : parts) { // check that the element was actually contained - assertTrue(allPartitions.remove(p.partition())); + assertTrue(allPartitions.remove(p)); } } @@ -95,25 +116,26 @@ public class KafkaConsumerPartitionAssignmentTest { @Test public void testPartitionsFewerThanConsumers() { try { - final int[] partitions = {4, 52, 17, 1}; + List inPartitions = new ArrayList<>(); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); - final Set allPartitions = new HashSet<>(); - for (int i : partitions) { - allPartitions.add(i); - } + final Set allPartitions = new HashSet<>(); + allPartitions.addAll(inPartitions); + + final int numConsumers = 2 * inPartitions.size() + 3; - final int numConsumers = 2 * partitions.length + 3; - for (int i = 0; i < numConsumers; i++) { - List parts = FlinkKafkaConsumer.assignPartitions( - partitions, "test-topic", numConsumers, i); + List parts = FlinkKafkaConsumer.assignPartitions(inPartitions, numConsumers, i); assertNotNull(parts); assertTrue(parts.size() <= 1); - - for (TopicPartition p : parts) { + + for (KafkaTopicPartitionLeader p : parts) { // check that the element was actually contained - assertTrue(allPartitions.remove(p.partition())); + assertTrue(allPartitions.remove(p)); } } @@ -125,15 +147,16 @@ public class KafkaConsumerPartitionAssignmentTest { fail(e.getMessage()); } } - + @Test public void testAssignEmptyPartitions() { try { - List parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2); + List ep = new ArrayList<>(); + List parts1 = FlinkKafkaConsumer.assignPartitions(ep, 4, 2); assertNotNull(parts1); assertTrue(parts1.isEmpty()); - List parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0); + List parts2 = FlinkKafkaConsumer.assignPartitions(ep, 1, 0); assertNotNull(parts2); assertTrue(parts2.isEmpty()); } @@ -146,35 +169,36 @@ public class KafkaConsumerPartitionAssignmentTest { @Test public void testGrowingPartitionsRemainsStable() { try { - final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; - final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7); + final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + List newPartitions = new ArrayList<>(); - final Set allNewPartitions = new HashSet<>(); - final Set allInitialPartitions = new HashSet<>(); - for (int i : newPartitions) { - allNewPartitions.add(i); - } - for (int i : initialPartitions) { - allInitialPartitions.add(i); + for (int p : newPartitionIDs) { + KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + newPartitions.add(part); } + List initialPartitions = newPartitions.subList(0, 7); + + final Set allNewPartitions = new HashSet<>(newPartitions); + final Set allInitialPartitions = new HashSet<>(initialPartitions); + final int numConsumers = 3; - final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers; - final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1; - final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers; - final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1; - - List parts1 = FlinkKafkaConsumer.assignPartitions( - initialPartitions, "test-topic", numConsumers, 0); - List parts2 = FlinkKafkaConsumer.assignPartitions( - initialPartitions, "test-topic", numConsumers, 1); - List parts3 = FlinkKafkaConsumer.assignPartitions( - initialPartitions, "test-topic", numConsumers, 2); + final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers; + final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1; + final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; + final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; + + List parts1 = FlinkKafkaConsumer.assignPartitions( + initialPartitions, numConsumers, 0); + List parts2 = FlinkKafkaConsumer.assignPartitions( + initialPartitions, numConsumers, 1); + List parts3 = FlinkKafkaConsumer.assignPartitions( + initialPartitions, numConsumers, 2); assertNotNull(parts1); assertNotNull(parts2); assertNotNull(parts3); - + assertTrue(parts1.size() >= minInitialPartitionsPerConsumer); assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer); assertTrue(parts2.size() >= minInitialPartitionsPerConsumer); @@ -182,37 +206,37 @@ public class KafkaConsumerPartitionAssignmentTest { assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); - for (TopicPartition p : parts1) { + for (KafkaTopicPartitionLeader p : parts1) { // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p.partition())); + assertTrue(allInitialPartitions.remove(p)); } - for (TopicPartition p : parts2) { + for (KafkaTopicPartitionLeader p : parts2) { // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p.partition())); + assertTrue(allInitialPartitions.remove(p)); } - for (TopicPartition p : parts3) { + for (KafkaTopicPartitionLeader p : parts3) { // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p.partition())); + assertTrue(allInitialPartitions.remove(p)); } - + // all partitions must have been assigned assertTrue(allInitialPartitions.isEmpty()); - + // grow the set of partitions and distribute anew - - List parts1new = FlinkKafkaConsumer.assignPartitions( - newPartitions, "test-topic", numConsumers, 0); - List parts2new = FlinkKafkaConsumer.assignPartitions( - newPartitions, "test-topic", numConsumers, 1); - List parts3new = FlinkKafkaConsumer.assignPartitions( - newPartitions, "test-topic", numConsumers, 2); + + List parts1new = FlinkKafkaConsumer.assignPartitions( + newPartitions, numConsumers, 0); + List parts2new = FlinkKafkaConsumer.assignPartitions( + newPartitions, numConsumers, 1); + List parts3new = FlinkKafkaConsumer.assignPartitions( + newPartitions, numConsumers, 2); // new partitions must include all old partitions - + assertTrue(parts1new.size() > parts1.size()); assertTrue(parts2new.size() > parts2.size()); assertTrue(parts3new.size() > parts3.size()); - + assertTrue(parts1new.containsAll(parts1)); assertTrue(parts2new.containsAll(parts2)); assertTrue(parts3new.containsAll(parts3)); @@ -224,17 +248,17 @@ public class KafkaConsumerPartitionAssignmentTest { assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); - for (TopicPartition p : parts1new) { + for (KafkaTopicPartitionLeader p : parts1new) { // check that the element was actually contained - assertTrue(allNewPartitions.remove(p.partition())); + assertTrue(allNewPartitions.remove(p)); } - for (TopicPartition p : parts2new) { + for (KafkaTopicPartitionLeader p : parts2new) { // check that the element was actually contained - assertTrue(allNewPartitions.remove(p.partition())); + assertTrue(allNewPartitions.remove(p)); } - for (TopicPartition p : parts3new) { + for (KafkaTopicPartitionLeader p : parts3new) { // check that the element was actually contained - assertTrue(allNewPartitions.remove(p.partition())); + assertTrue(allNewPartitions.remove(p)); } // all partitions must have been assigned @@ -245,13 +269,5 @@ public class KafkaConsumerPartitionAssignmentTest { fail(e.getMessage()); } } - - private static boolean contains(int[] array, int value) { - for (int i : array) { - if (i == value) { - return true; - } - } - return false; - } + } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java index ec7db42e7cec8b30cd87ff0f2514c8ee08a24624..efae9228acfa43f2b1678331338d812b8b6be686 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java @@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.Ignore; import org.junit.Test; import java.lang.reflect.Field; -import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.*; @@ -82,37 +85,45 @@ public class KafkaConsumerTest { Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets"); Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running"); Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints"); - + offsetsField.setAccessible(true); runningField.setAccessible(true); mapField.setAccessible(true); FlinkKafkaConsumer consumer = mock(FlinkKafkaConsumer.class); when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); - - long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 }; + + + HashMap testOffsets = new HashMap<>(); + long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 }; + int j = 0; + for (long i: offsets) { + KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++); + testOffsets.put(ktp, i); + } + LinkedMap map = new LinkedMap(); - + offsetsField.set(consumer, testOffsets); runningField.set(consumer, true); mapField.set(consumer, map); - + assertTrue(map.isEmpty()); // make multiple checkpoints for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) { - long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId); - assertArrayEquals(testOffsets, checkpoint); - + HashMap checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId); + assertEquals(testOffsets, checkpoint); + // change the offsets, make sure the snapshot did not change - long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length); - - for (int i = 0; i < testOffsets.length; i++) { - testOffsets[i] += 1L; + HashMap checkpointCopy = (HashMap) checkpoint.clone(); + + for (Map.Entry e: testOffsets.entrySet()) { + testOffsets.put(e.getKey(), e.getValue() + 1); } - - assertArrayEquals(checkpointCopy, checkpoint); - + + assertEquals(checkpointCopy, checkpoint); + assertTrue(map.size() > 0); assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS); } @@ -132,7 +143,7 @@ public class KafkaConsumerTest { props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); props.setProperty("group.id", "non-existent-group"); - new FlinkKafkaConsumer<>("no op topic", new SimpleStringSchema(), props, + new FlinkKafkaConsumer<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props, FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER, FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL); } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 2116c015880e716fa87d6ed2f16b30a52f08d1b7..044680e978aa2930b40bc8d323ab2e2ebbecfe79 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -32,12 +32,16 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; @@ -52,6 +56,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; @@ -74,6 +80,7 @@ import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Assert; @@ -87,6 +94,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -94,7 +102,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -148,8 +155,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { stream.print(); see.execute("No broker test"); } catch(RuntimeException re){ - Assert.assertTrue("Wrong RuntimeException thrown", - re.getMessage().contains("Unable to retrieve any partitions for topic")); + Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re), + re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]")); } } /** @@ -166,19 +173,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Assert.assertEquals(0, pendingCheckpoints.size()); source.setRuntimeContext(new MockRuntimeContext(1, 0)); - final long[] initialOffsets = new long[] { 1337 }; + final HashMap initialOffsets = new HashMap<>(); + initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L); // first restore source.restoreState(initialOffsets); // then open source.open(new Configuration()); - long[] state1 = source.snapshotState(1, 15); + HashMap state1 = source.snapshotState(1, 15); - assertArrayEquals(initialOffsets, state1); + assertEquals(initialOffsets, state1); + + HashMap state2 = source.snapshotState(2, 30); + Assert.assertEquals(initialOffsets, state2); - long[] state2 = source.snapshotState(2, 30); - Assert.assertArrayEquals(initialOffsets, state2); Assert.assertEquals(2, pendingCheckpoints.size()); source.notifyCheckpointComplete(1); @@ -772,6 +781,92 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { deleteTestTopic(topic); } + public void runConsumeMultipleTopics() throws java.lang.Exception { + final int NUM_TOPICS = 5; + final int NUM_ELEMENTS = 20; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + + // create topics with content + final List topics = new ArrayList<>(); + for (int i = 0; i < NUM_TOPICS; i++) { + final String topic = "topic-" + i; + topics.add(topic); + // create topic + createTestTopic(topic, i + 1 /*partitions*/, 1); + + // write something + writeSequence(env, topic, NUM_ELEMENTS, i + 1); + } + + // validate getPartitionsForTopic method + List topicPartitions = FlinkKafkaConsumer082.getPartitionsForTopic(topics, standardProps); + Assert.assertEquals((NUM_TOPICS * (NUM_TOPICS + 1))/2, topicPartitions.size()); + + KeyedDeserializationSchema> readSchema = new Tuple2WithTopicDeserializationSchema(env.getConfig()); + DataStreamSource> stream = env.addSource(new FlinkKafkaConsumer082<>(topics, readSchema, standardProps)); + + stream.flatMap(new FlatMapFunction, Integer>() { + Map countPerTopic = new HashMap<>(NUM_TOPICS); + @Override + public void flatMap(Tuple3 value, Collector out) throws Exception { + Integer count = countPerTopic.get(value.f2); + if (count == null) { + count = 1; + } else { + count++; + } + countPerTopic.put(value.f2, count); + + // check map: + for (Map.Entry el: countPerTopic.entrySet()) { + if (el.getValue() < NUM_ELEMENTS) { + break; // not enough yet + } + if (el.getValue() > NUM_ELEMENTS) { + throw new RuntimeException("There is a failure in the test. I've read " + + el.getValue() + " from topic " + el.getKey()); + } + } + // we've seen messages from all topics + throw new SuccessException(); + } + }).setParallelism(1); + + tryExecute(env, "Count elements from the topics"); + + + // delete all topics again + for (int i = 0; i < NUM_TOPICS; i++) { + final String topic = "topic-" + i; + deleteTestTopic(topic); + } + } + + private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema> { + + TypeSerializer ts; + public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) { + ts = TypeInfoParser.parse("Tuple2").createSerializer(ec); + } + + @Override + public Tuple3 deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException { + Tuple2 t2 = (Tuple2) ts.deserialize(new ByteArrayInputView(message)); + return new Tuple3<>(t2.f0, t2.f1, topic); + } + + @Override + public boolean isEndOfStream(Tuple3 nextElement) { + return false; + } + + @Override + public TypeInformation> getProducedType() { + return TypeInfoParser.parse("Tuple3"); + } + } + /** * Test Flink's Kafka integration also with very big records (30MB) * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message @@ -816,13 +911,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { elCnt++; if (value.f0 == -1) { // we should have seen 11 elements now. - if(elCnt == 11) { + if (elCnt == 11) { throw new SuccessException(); } else { throw new RuntimeException("There have been "+elCnt+" elements"); } } - if(elCnt > 10) { + if (elCnt > 10) { throw new RuntimeException("More than 10 elements seen: "+elCnt); } } @@ -965,7 +1060,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { @Override public void run(SourceContext> ctx) throws Exception { Random rnd = new Random(1337); - for(long i = 0; i < ELEMENT_COUNT; i++) { + for (long i = 0; i < ELEMENT_COUNT; i++) { PojoValue pojo = new PojoValue(); pojo.when = new Date(rnd.nextLong()); pojo.lon = rnd.nextLong(); @@ -1002,13 +1097,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { public void flatMap(Tuple2 value, Collector out) throws Exception { // the elements should be in order. Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter ); - if(value.f1.lat % 2 == 0) { + if (value.f1.lat % 2 == 0) { Assert.assertNull("key was not null", value.f0); } else { Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter); } counter++; - if(counter == ELEMENT_COUNT) { + if (counter == ELEMENT_COUNT) { // we got the right number of elements throw new SuccessException(); } @@ -1083,6 +1178,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception { + LOG.info("\n===================================\n== Writing sequence of "+numElements+" into "+topicName+" with p="+parallelism+"\n==================================="); TypeInformation> resultType = TypeInfoParser.parse("Tuple2"); DataStream> stream = env.addSource(new RichParallelSourceFunction>() { @@ -1130,14 +1226,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // will see each message only once. Map topicCountMap = Collections.singletonMap(topicName, 1); Map>> streams = consumerConnector.createMessageStreams(topicCountMap); - if(streams.size() != 1) { + if (streams.size() != 1) { throw new RuntimeException("Expected only one message stream but got "+streams.size()); } List> kafkaStreams = streams.get(topicName); - if(kafkaStreams == null) { + if (kafkaStreams == null) { throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString()); } - if(kafkaStreams.size() != 1) { + if (kafkaStreams.size() != 1) { throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams"); } LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); @@ -1148,7 +1244,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { while(iteratorToRead.hasNext()) { read++; result.add(iteratorToRead.next()); - if(read == stopAfter) { + if (read == stopAfter) { LOG.info("Read "+read+" elements"); return result; } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 5f2cdbca7192d19b2295f115ff99917e0816937e..20846cfc547a9324f26b6f13d25ae9fdd6893584 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -118,4 +118,10 @@ public class KafkaITCase extends KafkaConsumerTestBase { public void testBigRecordJob() throws Exception { runBigRecordTestTopology(); } + + @Test + public void testMultipleTopics() throws Exception { + runConsumeMultipleTopics(); + } + } diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java index 5001364402927c5e4f636d37cbe9001807ce91dd..f4c1899ba0dd8a801a0dec40c811a46970e186f8 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.serialization.TypeInformationSerializatio import org.junit.Test; import java.io.Serializable; +import java.util.Collections; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -109,7 +110,7 @@ public class KafkaProducerITCase extends KafkaTestBase { // ------ consuming topology --------- FlinkKafkaConsumer> source = - new FlinkKafkaConsumer<>(topic, deserSchema, standardProps, + new FlinkKafkaConsumer<>(Collections.singletonList(topic), deserSchema, standardProps, FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER, FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL); diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index d5117969630b0d0ecf470fb00c31b968e27c272e..e6e179cf36b0cad94f8964ef4ed2d5e911407a58 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException; import org.apache.flink.test.util.ForkableFlinkMiniCluster; @@ -54,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.net.BindException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -360,7 +362,7 @@ public abstract class KafkaTestBase extends TestLogger { catch (InterruptedException e) { // restore interrupted state } - List partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps); + List partitions = FlinkKafkaConsumer.getPartitionsForTopic(Collections.singletonList(topic), standardProps); if (partitions != null && partitions.size() > 0) { return; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java index 6a20e4462ebaef553e65fd87c7a2e89e90bab5fa..917c2330801be4676bfd14140d23a982695c48ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java @@ -39,7 +39,7 @@ public interface KeyedDeserializationSchema extends Serializable, ResultTypeQ * @param offset the offset of the message in the original source (for example the Kafka offset) * @return The deserialized message as an object. */ - T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException; + T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException; /** * Method to decide whether the element signals the end of the stream. If diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java index fc7bd1e2af1dff142c291dd99448bf2997b202f6..8d9cf5dcee2dc294419886f52f1dce78f7275950 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java @@ -35,7 +35,7 @@ public class KeyedDeserializationSchemaWrapper implements KeyedDeserializatio this.deserializationSchema = deserializationSchema; } @Override - public T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException { + public T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException { return deserializationSchema.deserialize(message); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index 1c8efd5637ea4b8e6e71884a604c64965e48baee..ef9cde5bb94c116095b09da8b5de7acde9c65c0e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -79,7 +79,7 @@ public class TypeInformationKeyValueSerializationSchema implements KeyedDe @Override - public Tuple2 deserialize(byte[] messageKey, byte[] message, long offset) throws IOException { + public Tuple2 deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException { K key = null; if(messageKey != null) { key = keySerializer.deserialize(new ByteArrayInputView(messageKey)); diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index 0b20b83ee613ae4a4a994d302635d66654f0f6d6..a558116a2ad0f7e4264749a2a1d0e0e7c5be4ac9 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -71,6 +71,11 @@ under the License. + + + + +