提交 fc8be1ca 编写于 作者: R Robert Metzger

[FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer instance

This closes #1437
上级 4dbb10f5
......@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.java.DataSet;
......
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Collections;
import java.util.Properties;
/**
......@@ -52,6 +53,6 @@ public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
}
......@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
......@@ -47,9 +49,12 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
//----- key-value deserializer constructor
/**
* Creates a new Kafka 0.8.2.x streaming source consumer.
*
......@@ -64,6 +69,17 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topic, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
super(Collections.singletonList(topic), deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
//----- topic list constructors
public FlinkKafkaConsumer082(List<String> topics, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topics, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
public FlinkKafkaConsumer082(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
}
......@@ -198,13 +198,13 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
// set the producer configuration properties.
if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.api;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
/**
* Sink that emits its inputs to a Kafka topic.
*
* The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
* This class will be removed in future releases of Flink.
*
* @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead.
*/
@Deprecated
public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
super(brokerList, topicId, serializationSchema);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.api.persistent;
import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
/**
* Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
*
* This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
*
* Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
*
* @param <T> The type of elements produced by this consumer.
*
* @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the
* Kafka version specific consumers, like
* {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081},
* {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc.
*/
@Deprecated
public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
private static final long serialVersionUID = -8450689820627198228L;
/**
* Creates a new Kafka 0.8.2.x streaming source consumer.
*
* @param topic
* The name of the topic that should be consumed.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param consumerConfig
* The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
}
......@@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.List;
import java.util.HashMap;
/**
* A fetcher pulls data from Kafka, from a fix set of partitions.
......@@ -30,16 +29,9 @@ import java.util.List;
*/
public interface Fetcher {
/**
* Set which partitions the fetcher should pull from.
*
* @param partitions The list of partitions for a topic that the fetcher will pull from.
*/
void setPartitionsToRead(List<TopicPartition> partitions);
/**
* Closes the fetcher. This will stop any operation in the
* {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
* {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually
* close underlying connections and release all resources.
*/
void close() throws IOException;
......@@ -61,15 +53,14 @@ public interface Fetcher {
* }
* }
* }</pre>
*
*
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
* @param sourceContext The source context to emit elements to.
* @param valueDeserializer The deserializer to decode the raw values with.
* @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
*
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
* @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state)
*/
<T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer,
long[] lastOffsets) throws Exception;
HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception;
/**
* Set the next offset to read from for the given partition.
......@@ -79,7 +70,7 @@ public interface Fetcher {
* @param topicPartition The partition for which to seek the offset.
* @param offsetToRead To offset to seek to.
*/
void seek(TopicPartition topicPartition, long offsetToRead);
void seek(KafkaTopicPartition topicPartition, long offsetToRead);
/**
* Exit run loop with given error and release all resources.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internals;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A serializable representation of a kafka topic and a partition.
* Used as an operator state for the Kafka consumer
*/
public class KafkaTopicPartition implements Serializable {
private static final long serialVersionUID = 722083576322742325L;
private final String topic;
private final int partition;
private final int cachedHash;
public KafkaTopicPartition(String topic, int partition) {
this.topic = checkNotNull(topic);
this.partition = partition;
this.cachedHash = 31 * topic.hashCode() + partition;
}
public String getTopic() {
return topic;
}
public int getPartition() {
return partition;
}
@Override
public String toString() {
return "KafkaTopicPartition{" +
"topic='" + topic + '\'' +
", partition=" + partition +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KafkaTopicPartition)) {
return false;
}
KafkaTopicPartition that = (KafkaTopicPartition) o;
if (partition != that.partition) {
return false;
}
return topic.equals(that.topic);
}
@Override
public int hashCode() {
return cachedHash;
}
// ------------------- Utilities -------------------------------------
/**
* Returns a unique list of topics from the topic partition map
*
* @param topicPartitionMap A map of KafkaTopicPartition's
* @return A unique list of topics from the input map
*/
public static List<String> getTopics(Map<KafkaTopicPartition, ?> topicPartitionMap) {
HashSet<String> uniqueTopics = new HashSet<>();
for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) {
uniqueTopics.add(ktp.getTopic());
}
return new ArrayList<>(uniqueTopics);
}
public static String toString(Map<KafkaTopicPartition, Long> map) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
KafkaTopicPartition ktp = p.getKey();
sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
}
return sb.toString();
}
/**
* Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders
*
* @param map The map of KafkaTopicPartitionLeaders
* @return true if the element is contained.
*/
public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) {
for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : map.entrySet()) {
if(entry.getKey().getTopicPartition().equals(this)) {
return true;
}
}
return false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.kafka.common.Node;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* Serializable Topic Partition info with leader Node information.
* This class is used at runtime.
*/
public class KafkaTopicPartitionLeader implements Serializable {
private static final long serialVersionUID = 9145855900303748582L;
private final int leaderId;
private final int leaderPort;
private final String leaderHost;
private final KafkaTopicPartition topicPartition;
private final int cachedHash;
public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
this.topicPartition = topicPartition;
if (leader == null) {
this.leaderId = -1;
this.leaderHost = null;
this.leaderPort = -1;
} else {
this.leaderId = leader.id();
this.leaderPort = leader.port();
this.leaderHost = leader.host();
}
int cachedHash = (leader == null) ? 14 : leader.hashCode();
this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
}
public KafkaTopicPartition getTopicPartition() {
return topicPartition;
}
public Node getLeader() {
if (this.leaderId == -1) {
return null;
} else {
return new Node(leaderId, leaderHost, leaderPort);
}
}
public static Object toString(List<KafkaTopicPartitionLeader> partitions) {
StringBuilder sb = new StringBuilder();
for (KafkaTopicPartitionLeader p: partitions) {
sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", ");
}
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KafkaTopicPartitionLeader)) {
return false;
}
KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
if (!topicPartition.equals(that.topicPartition)) {
return false;
}
return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
}
@Override
public int hashCode() {
return cachedHash;
}
@Override
public String toString() {
return "KafkaTopicPartitionLeader{" +
"leaderId=" + leaderId +
", leaderPort=" + leaderPort +
", leaderHost='" + leaderHost + '\'' +
", topic=" + topicPartition.getTopic() +
", partition=" + topicPartition.getPartition() +
'}';
}
/**
* Replaces an existing KafkaTopicPartition ignoring the leader in the given map.
*
* @param newKey new topicpartition
* @param newValue new offset
* @param map map to do the search in
* @return oldValue the old value (offset)
*/
public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) {
for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: map.entrySet()) {
if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) {
Long oldValue = map.remove(entry.getKey());
if(map.put(newKey, newValue) != null) {
throw new IllegalStateException("Key was not removed before");
}
return oldValue;
}
}
return null;
}
}
......@@ -19,8 +19,6 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.List;
import java.util.Map;
......@@ -38,7 +36,7 @@ public interface OffsetHandler {
*
* @param offsetsToCommit The offset to commit, per partition.
*/
void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception;
/**
* Positions the given fetcher to the initial read offsets where the stream consumption
......@@ -47,7 +45,7 @@ public interface OffsetHandler {
* @param partitions The partitions for which to seeks the fetcher to the beginning.
* @param fetcher The fetcher that will pull data from Kafka and must be positioned.
*/
void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception;
/**
* Closes the offset handler, releasing all resources.
......
......@@ -25,7 +25,6 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
......@@ -71,28 +70,28 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
@Override
public void commit(Map<TopicPartition, Long> offsetsToCommit) {
for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
TopicPartition tp = entry.getKey();
public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) {
for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
long offset = entry.getValue();
if (offset >= 0) {
setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset);
}
}
}
@Override
public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
for (TopicPartition tp : partitions) {
long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) {
for (KafkaTopicPartitionLeader tp : partitions) {
long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
if (offset != OFFSET_NOT_SET) {
LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
tp.partition(), offset);
tp.getTopicPartition().getPartition(), offset);
// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
fetcher.seek(tp, offset + 1);
fetcher.seek(tp.getTopicPartition(), offset + 1);
}
}
}
......
......@@ -59,12 +59,12 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable {
@Override
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
int p = 0;
for(int i = 0; i < parallelInstances; i++) {
if(i == parallelInstanceId) {
for (int i = 0; i < parallelInstances; i++) {
if (i == parallelInstanceId) {
targetPartition = partitions[p];
return;
}
if(++p == partitions.length) {
if (++p == partitions.length) {
p = 0;
}
}
......@@ -72,7 +72,7 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable {
@Override
public int partition(Object element, int numPartitions) {
if(targetPartition == -1) {
if (targetPartition == -1) {
throw new RuntimeException("The partitioner has not been initialized properly");
}
return targetPartition;
......
......@@ -19,10 +19,12 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.kafka.common.Node;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
......@@ -30,23 +32,30 @@ import java.util.Set;
import static org.junit.Assert.*;
/**
* Tests that the partition assignment is deterministic and stable.
*/
public class KafkaConsumerPartitionAssignmentTest {
private final Node fake = new Node(1337, "localhost", 1337);
@Test
public void testPartitionsEqualConsumers() {
try {
int[] partitions = {4, 52, 17, 1};
for (int i = 0; i < partitions.length; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", partitions.length, i);
List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
for (int i = 0; i < inPartitions.size(); i++) {
List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(
inPartitions, inPartitions.size(), i);
assertNotNull(parts);
assertEquals(1, parts.size());
assertTrue(contains(partitions, parts.get(0).partition()));
assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
}
}
catch (Exception e) {
......@@ -55,31 +64,43 @@ public class KafkaConsumerPartitionAssignmentTest {
}
}
private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) {
for (KafkaTopicPartitionLeader ktp: inPartitions) {
if (ktp.getTopicPartition().getPartition() == partition) {
return true;
}
}
return false;
}
@Test
public void testMultiplePartitionsPerConsumers() {
try {
final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
final Set<Integer> allPartitions = new HashSet<>();
for (int i : partitions) {
allPartitions.add(i);
for (int p : partitionIDs) {
KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
partitions.add(part);
allPartitions.add(part);
}
final int numConsumers = 3;
final int minPartitionsPerConsumer = partitions.length / numConsumers;
final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
final int minPartitionsPerConsumer = partitions.size() / numConsumers;
final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
for (int i = 0; i < numConsumers; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", numConsumers, i);
List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(partitions, numConsumers, i);
assertNotNull(parts);
assertTrue(parts.size() >= minPartitionsPerConsumer);
assertTrue(parts.size() <= maxPartitionsPerConsumer);
for (TopicPartition p : parts) {
for (KafkaTopicPartitionLeader p : parts) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p.partition()));
assertTrue(allPartitions.remove(p));
}
}
......@@ -95,25 +116,26 @@ public class KafkaConsumerPartitionAssignmentTest {
@Test
public void testPartitionsFewerThanConsumers() {
try {
final int[] partitions = {4, 52, 17, 1};
List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
final Set<Integer> allPartitions = new HashSet<>();
for (int i : partitions) {
allPartitions.add(i);
}
final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
allPartitions.addAll(inPartitions);
final int numConsumers = 2 * inPartitions.size() + 3;
final int numConsumers = 2 * partitions.length + 3;
for (int i = 0; i < numConsumers; i++) {
List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
partitions, "test-topic", numConsumers, i);
List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(inPartitions, numConsumers, i);
assertNotNull(parts);
assertTrue(parts.size() <= 1);
for (TopicPartition p : parts) {
for (KafkaTopicPartitionLeader p : parts) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p.partition()));
assertTrue(allPartitions.remove(p));
}
}
......@@ -125,15 +147,16 @@ public class KafkaConsumerPartitionAssignmentTest {
fail(e.getMessage());
}
}
@Test
public void testAssignEmptyPartitions() {
try {
List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(ep, 4, 2);
assertNotNull(parts1);
assertTrue(parts1.isEmpty());
List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(ep, 1, 0);
assertNotNull(parts2);
assertTrue(parts2.isEmpty());
}
......@@ -146,35 +169,36 @@ public class KafkaConsumerPartitionAssignmentTest {
@Test
public void testGrowingPartitionsRemainsStable() {
try {
final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
final Set<Integer> allNewPartitions = new HashSet<>();
final Set<Integer> allInitialPartitions = new HashSet<>();
for (int i : newPartitions) {
allNewPartitions.add(i);
}
for (int i : initialPartitions) {
allInitialPartitions.add(i);
for (int p : newPartitionIDs) {
KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
newPartitions.add(part);
}
List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
final int numConsumers = 3;
final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 0);
List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 1);
List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, "test-topic", numConsumers, 2);
final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, numConsumers, 0);
List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, numConsumers, 1);
List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumer.assignPartitions(
initialPartitions, numConsumers, 2);
assertNotNull(parts1);
assertNotNull(parts2);
assertNotNull(parts3);
assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
......@@ -182,37 +206,37 @@ public class KafkaConsumerPartitionAssignmentTest {
assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
for (TopicPartition p : parts1) {
for (KafkaTopicPartitionLeader p : parts1) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p.partition()));
assertTrue(allInitialPartitions.remove(p));
}
for (TopicPartition p : parts2) {
for (KafkaTopicPartitionLeader p : parts2) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p.partition()));
assertTrue(allInitialPartitions.remove(p));
}
for (TopicPartition p : parts3) {
for (KafkaTopicPartitionLeader p : parts3) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p.partition()));
assertTrue(allInitialPartitions.remove(p));
}
// all partitions must have been assigned
assertTrue(allInitialPartitions.isEmpty());
// grow the set of partitions and distribute anew
List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 0);
List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 1);
List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
newPartitions, "test-topic", numConsumers, 2);
List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumer.assignPartitions(
newPartitions, numConsumers, 0);
List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumer.assignPartitions(
newPartitions, numConsumers, 1);
List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumer.assignPartitions(
newPartitions, numConsumers, 2);
// new partitions must include all old partitions
assertTrue(parts1new.size() > parts1.size());
assertTrue(parts2new.size() > parts2.size());
assertTrue(parts3new.size() > parts3.size());
assertTrue(parts1new.containsAll(parts1));
assertTrue(parts2new.containsAll(parts2));
assertTrue(parts3new.containsAll(parts3));
......@@ -224,17 +248,17 @@ public class KafkaConsumerPartitionAssignmentTest {
assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
for (TopicPartition p : parts1new) {
for (KafkaTopicPartitionLeader p : parts1new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p.partition()));
assertTrue(allNewPartitions.remove(p));
}
for (TopicPartition p : parts2new) {
for (KafkaTopicPartitionLeader p : parts2new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p.partition()));
assertTrue(allNewPartitions.remove(p));
}
for (TopicPartition p : parts3new) {
for (KafkaTopicPartitionLeader p : parts3new) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p.partition()));
assertTrue(allNewPartitions.remove(p));
}
// all partitions must have been assigned
......@@ -245,13 +269,5 @@ public class KafkaConsumerPartitionAssignmentTest {
fail(e.getMessage());
}
}
private static boolean contains(int[] array, int value) {
for (int i : array) {
if (i == value) {
return true;
}
}
return false;
}
}
......@@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Ignore;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.*;
......@@ -82,37 +85,45 @@ public class KafkaConsumerTest {
Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
offsetsField.setAccessible(true);
runningField.setAccessible(true);
mapField.setAccessible(true);
FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>();
long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 };
int j = 0;
for (long i: offsets) {
KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++);
testOffsets.put(ktp, i);
}
LinkedMap map = new LinkedMap();
offsetsField.set(consumer, testOffsets);
runningField.set(consumer, true);
mapField.set(consumer, map);
assertTrue(map.isEmpty());
// make multiple checkpoints
for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
assertArrayEquals(testOffsets, checkpoint);
HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
assertEquals(testOffsets, checkpoint);
// change the offsets, make sure the snapshot did not change
long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
for (int i = 0; i < testOffsets.length; i++) {
testOffsets[i] += 1L;
HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) {
testOffsets.put(e.getKey(), e.getValue() + 1);
}
assertArrayEquals(checkpointCopy, checkpoint);
assertEquals(checkpointCopy, checkpoint);
assertTrue(map.size() > 0);
assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
}
......@@ -132,7 +143,7 @@ public class KafkaConsumerTest {
props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
props.setProperty("group.id", "non-existent-group");
new FlinkKafkaConsumer<>("no op topic", new SimpleStringSchema(), props,
new FlinkKafkaConsumer<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props,
FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
}
......
......@@ -32,12 +32,16 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
......@@ -52,6 +56,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
......@@ -74,6 +80,7 @@ import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
......@@ -87,6 +94,7 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -94,7 +102,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
......@@ -148,8 +155,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
stream.print();
see.execute("No broker test");
} catch(RuntimeException re){
Assert.assertTrue("Wrong RuntimeException thrown",
re.getMessage().contains("Unable to retrieve any partitions for topic"));
Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
}
}
/**
......@@ -166,19 +173,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Assert.assertEquals(0, pendingCheckpoints.size());
source.setRuntimeContext(new MockRuntimeContext(1, 0));
final long[] initialOffsets = new long[] { 1337 };
final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>();
initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L);
// first restore
source.restoreState(initialOffsets);
// then open
source.open(new Configuration());
long[] state1 = source.snapshotState(1, 15);
HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15);
assertArrayEquals(initialOffsets, state1);
assertEquals(initialOffsets, state1);
HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30);
Assert.assertEquals(initialOffsets, state2);
long[] state2 = source.snapshotState(2, 30);
Assert.assertArrayEquals(initialOffsets, state2);
Assert.assertEquals(2, pendingCheckpoints.size());
source.notifyCheckpointComplete(1);
......@@ -772,6 +781,92 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
public void runConsumeMultipleTopics() throws java.lang.Exception {
final int NUM_TOPICS = 5;
final int NUM_ELEMENTS = 20;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
// create topics with content
final List<String> topics = new ArrayList<>();
for (int i = 0; i < NUM_TOPICS; i++) {
final String topic = "topic-" + i;
topics.add(topic);
// create topic
createTestTopic(topic, i + 1 /*partitions*/, 1);
// write something
writeSequence(env, topic, NUM_ELEMENTS, i + 1);
}
// validate getPartitionsForTopic method
List<KafkaTopicPartitionLeader> topicPartitions = FlinkKafkaConsumer082.getPartitionsForTopic(topics, standardProps);
Assert.assertEquals((NUM_TOPICS * (NUM_TOPICS + 1))/2, topicPartitions.size());
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> readSchema = new Tuple2WithTopicDeserializationSchema(env.getConfig());
DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(new FlinkKafkaConsumer082<>(topics, readSchema, standardProps));
stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
@Override
public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
Integer count = countPerTopic.get(value.f2);
if (count == null) {
count = 1;
} else {
count++;
}
countPerTopic.put(value.f2, count);
// check map:
for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
if (el.getValue() < NUM_ELEMENTS) {
break; // not enough yet
}
if (el.getValue() > NUM_ELEMENTS) {
throw new RuntimeException("There is a failure in the test. I've read " +
el.getValue() + " from topic " + el.getKey());
}
}
// we've seen messages from all topics
throw new SuccessException();
}
}).setParallelism(1);
tryExecute(env, "Count elements from the topics");
// delete all topics again
for (int i = 0; i < NUM_TOPICS; i++) {
final String topic = "topic-" + i;
deleteTestTopic(topic);
}
}
private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> {
TypeSerializer ts;
public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) {
ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec);
}
@Override
public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException {
Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new ByteArrayInputView(message));
return new Tuple3<>(t2.f0, t2.f1, topic);
}
@Override
public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
return false;
}
@Override
public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
}
}
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
......@@ -816,13 +911,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
elCnt++;
if (value.f0 == -1) {
// we should have seen 11 elements now.
if(elCnt == 11) {
if (elCnt == 11) {
throw new SuccessException();
} else {
throw new RuntimeException("There have been "+elCnt+" elements");
}
}
if(elCnt > 10) {
if (elCnt > 10) {
throw new RuntimeException("More than 10 elements seen: "+elCnt);
}
}
......@@ -965,7 +1060,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Override
public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
Random rnd = new Random(1337);
for(long i = 0; i < ELEMENT_COUNT; i++) {
for (long i = 0; i < ELEMENT_COUNT; i++) {
PojoValue pojo = new PojoValue();
pojo.when = new Date(rnd.nextLong());
pojo.lon = rnd.nextLong();
......@@ -1002,13 +1097,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
// the elements should be in order.
Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
if(value.f1.lat % 2 == 0) {
if (value.f1.lat % 2 == 0) {
Assert.assertNull("key was not null", value.f0);
} else {
Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
}
counter++;
if(counter == ELEMENT_COUNT) {
if (counter == ELEMENT_COUNT) {
// we got the right number of elements
throw new SuccessException();
}
......@@ -1083,6 +1178,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
LOG.info("\n===================================\n== Writing sequence of "+numElements+" into "+topicName+" with p="+parallelism+"\n===================================");
TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
......@@ -1130,14 +1226,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// will see each message only once.
Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
if(streams.size() != 1) {
if (streams.size() != 1) {
throw new RuntimeException("Expected only one message stream but got "+streams.size());
}
List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
if(kafkaStreams == null) {
if (kafkaStreams == null) {
throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
}
if(kafkaStreams.size() != 1) {
if (kafkaStreams.size() != 1) {
throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
}
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
......@@ -1148,7 +1244,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while(iteratorToRead.hasNext()) {
read++;
result.add(iteratorToRead.next());
if(read == stopAfter) {
if (read == stopAfter) {
LOG.info("Read "+read+" elements");
return result;
}
......
......@@ -118,4 +118,10 @@ public class KafkaITCase extends KafkaConsumerTestBase {
public void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
@Test
public void testMultipleTopics() throws Exception {
runConsumeMultipleTopics();
}
}
......@@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.serialization.TypeInformationSerializatio
import org.junit.Test;
import java.io.Serializable;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
......@@ -109,7 +110,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
// ------ consuming topology ---------
FlinkKafkaConsumer<Tuple2<Long, String>> source =
new FlinkKafkaConsumer<>(topic, deserSchema, standardProps,
new FlinkKafkaConsumer<>(Collections.singletonList(topic), deserSchema, standardProps,
FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
......
......@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
......@@ -54,6 +55,7 @@ import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
......@@ -360,7 +362,7 @@ public abstract class KafkaTestBase extends TestLogger {
catch (InterruptedException e) {
// restore interrupted state
}
List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer.getPartitionsForTopic(Collections.singletonList(topic), standardProps);
if (partitions != null && partitions.size() > 0) {
return;
}
......
......@@ -39,7 +39,7 @@ public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQ
* @param offset the offset of the message in the original source (for example the Kafka offset)
* @return The deserialized message as an object.
*/
T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException;
T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException;
/**
* Method to decide whether the element signals the end of the stream. If
......
......@@ -35,7 +35,7 @@ public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializatio
this.deserializationSchema = deserializationSchema;
}
@Override
public T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
public T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException {
return deserializationSchema.deserialize(message);
}
......
......@@ -79,7 +79,7 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
@Override
public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException {
K key = null;
if(messageKey != null) {
key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
......
......@@ -71,6 +71,11 @@ under the License.
<property name="illegalPattern" value="true"/>
<property name="message" value="Use Flink's InstantiationUtil instead of common's SerializationUtils"/>
</module>
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang\."/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use commons-lang3 instead of commons-lang."/>
</module>
<module name="NeedBraces">
<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>
</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册