提交 ea710204 编写于 作者: R Robert Metzger

[streaming][connectors] Add more configuration options to PersistentKafkaSource

This closes #607
上级 c2faa6fe
......@@ -57,15 +57,20 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
public static final String WAIT_ON_FAILED_LEADER_MS_KEY = "flink.waitOnFailedLeaderDetection";
public static final int WAIT_ON_FAILED_LEADER__MS_DEFAULT = 2000;
public static final String MAX_FAILED_LEADER_RETRIES_KEY = "flink.maxLeaderDetectionRetries";
public static final int MAX_FAILED_LEADER_RETRIES_DEFAULT = 3;
private final String topicId;
private final KafkaOffset startingOffset;
private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
private transient KafkaConsumerIterator iterator;
private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSetOperatorState;
private transient Map<Integer, KafkaOffset> partitions;
private transient Map<Integer, KafkaOffset> partitionOffsets;
/**
* Creates a persistent Kafka source that consumes a topic.
......@@ -180,6 +185,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
}
// ---------------------- Source lifecycle methods (open / run / cancel ) -----------------
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws InterruptedException {
......@@ -193,29 +200,32 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
if (indexOfSubtask >= numberOfPartitions) {
LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
iterator = new KafkaIdleConsumerIterator();
} else {
if (context.containsState("kafka")) {
kafkaOffSet = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
LOG.info("Initializing PersistentKafkaSource from existing state.");
kafkaOffSetOperatorState = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
partitions = kafkaOffSet.getState();
partitionOffsets = kafkaOffSetOperatorState.getState();
} else {
partitions = new HashMap<Integer, KafkaOffset>();
LOG.info("No existing state found. Creating new");
partitionOffsets = new HashMap<Integer, KafkaOffset>();
for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
partitions.put(partitionIndex, startingOffset);
partitionOffsets.put(partitionIndex, startingOffset);
}
kafkaOffSet = new OperatorState<Map<Integer, KafkaOffset>>(partitions);
kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
context.registerState("kafka", kafkaOffSet);
context.registerState("kafka", kafkaOffSetOperatorState);
}
iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);
iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
if (LOG.isInfoEnabled()) {
LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
LOG.info("PersistentKafkaSource ({}/{}) listening to partitionOffsets {} of topic {}.",
indexOfSubtask + 1, numberOfSubtasks, partitionOffsets.keySet(), topicId);
}
}
......@@ -236,8 +246,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
collector.collect(out);
// TODO avoid object creation
partitions.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
kafkaOffSet.update(partitions);
partitionOffsets.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
kafkaOffSetOperatorState.update(partitionOffsets);
}
}
......@@ -246,6 +256,10 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
LOG.info("PersistentKafkaSource has been cancelled");
}
// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
......
......@@ -24,9 +24,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
*/
public interface KafkaConsumerIterator {
public void initialize() throws InterruptedException;
void initialize() throws InterruptedException;
public boolean hasNext();
boolean hasNext();
/**
* Returns the next message received from Kafka as a
......@@ -34,7 +34,7 @@ public interface KafkaConsumerIterator {
*
* @return next message as a byte array.
*/
public byte[] next() throws InterruptedException;
byte[] next() throws InterruptedException;
/**
* Returns the next message and its offset received from
......@@ -42,5 +42,5 @@ public interface KafkaConsumerIterator {
*
* @return next message and its offset.
*/
public MessageWithMetadata nextWithOffset() throws InterruptedException;
MessageWithMetadata nextWithOffset() throws InterruptedException;
}
......@@ -21,6 +21,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* No-op iterator. Used when more source tasks are available than Kafka partitions
*/
public class KafkaIdleConsumerIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class);
......
......@@ -29,6 +29,11 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iterator over multiple Kafka partitions.
*
* This is needed when num partitions > num kafka sources.
*/
public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
......@@ -55,9 +60,13 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
@Override
public void initialize() throws InterruptedException {
LOG.info("Initializing iterator with {} partitions", partitions.size());
String partInfo = "";
for (KafkaSinglePartitionIterator partition : partitions) {
partition.initialize();
partInfo += partition.toString() + " ";
}
LOG.info("Initialized partitions {}", partInfo);
}
@Override
......@@ -91,7 +100,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
// do not wait if a new message has been fetched
if (!gotNewMessage) {
try {
Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY), consumerConfig.fetchWaitMaxMs());
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for new messages", e);
}
......
......@@ -29,6 +29,8 @@ import java.util.Set;
import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
......@@ -48,6 +50,8 @@ import kafka.message.MessageAndOffset;
/**
* Iterates the records received from a partition of a Kafka topic as byte arrays.
*
* This code is in parts based on https://gist.github.com/ashrithr/5811266.
*/
public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
......@@ -55,8 +59,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L;
private List<String> hosts;
private String topic;
private int partition;
......@@ -107,24 +109,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
* Initializes the connection by detecting the leading broker of
* the topic and establishing a connection to it.
*/
public void initialize() throws InterruptedException {
public void initialize() {
if (LOG.isInfoEnabled()) {
LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
}
PartitionMetadata metadata;
do {
metadata = findLeader(hosts, topic, partition);
try {
Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
} catch (InterruptedException e) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
} while (metadata == null);
if (metadata.leader() == null) {
throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
}
PartitionMetadata metadata = getPartitionMetadata();
leadBroker = metadata.leader();
clientName = "Client_" + topic + "_" + partition;
......@@ -134,16 +124,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
try {
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
} catch (NotLeaderForPartitionException e) {
do {
metadata = findLeader(hosts, topic, partition);
try {
Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
} catch (InterruptedException ie) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
} while (metadata == null);
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
throw new RuntimeException("Unable to get offset",e);
}
try {
......@@ -156,6 +137,38 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
}
}
private PartitionMetadata getPartitionMetadata() {
PartitionMetadata metadata;
int retry = 0;
int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
do {
metadata = findLeader(hosts, topic, partition);
/*try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException("Establishing connection to Kafka failed", e);
} */
if(metadata == null) {
retry++;
if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
throw new RuntimeException("Tried finding a leader "+retry+" times without success");
}
LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException("Establishing connection to Kafka failed", e);
}
}
} while (metadata == null);
if (metadata.leader() == null) {
throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
}
return metadata;
}
/**
* Sets the partition to read from.
*
......@@ -185,18 +198,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
*
* @return next message as a byte array.
*/
public byte[] next() throws InterruptedException {
public byte[] next() {
return nextWithOffset().getMessage();
}
public boolean fetchHasNext() throws InterruptedException {
public boolean fetchHasNext() {
synchronized (fetchResponse) {
if (!iter.hasNext()) {
try {
resetFetchResponse(readOffset);
} catch (ClosedChannelException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Got ClosedChannelException, trying to find new leader.");
LOG.warn("Got ClosedChannelException, trying to find new leader.", e);
}
findNewLeader();
}
......@@ -213,7 +226,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
*
* @return next message and its offset.
*/
public MessageWithMetadata nextWithOffset() throws InterruptedException {
public MessageWithMetadata nextWithOffset() {
synchronized (fetchResponse) {
if (!iter.hasNext()) {
......@@ -243,7 +256,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
// Internal utilities
// --------------------------------------------------------------------------------------------
private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
private void resetFetchResponse(long offset) throws ClosedChannelException {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
......@@ -258,10 +271,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
if (LOG.isErrorEnabled()) {
LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset);
LOG.error("Asked for invalid offset {}", offset);
}
String reset = consumerConfig.autoOffsetReset();
if(reset.equals("smallest")) {
LOG.info("Setting read offset to beginning (smallest)");
readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName);
} else if(reset.equals("largest")) {
LOG.info("Setting read offset to current offset (largest)");
readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
} else {
throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'.");
}
readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
}
findNewLeader();
......@@ -270,15 +291,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
iter = fetchResponse.messageSet(topic, partition).iterator();
}
private void findNewLeader() throws InterruptedException {
private void findNewLeader() {
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition);
consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
}
private PartitionMetadata findLeader(List<String> addresses, String a_topic,
int a_partition) {
private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) {
PartitionMetadata returnMetaData = null;
loop:
......@@ -295,7 +315,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
......@@ -304,7 +324,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
if (part.partitionId() == partition) {
returnMetaData = part;
break loop;
}
......@@ -313,10 +333,9 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
} catch (Exception e) {
if (e instanceof ClosedChannelException) {
LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition);
"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition);
} else {
throw new RuntimeException("Error communicating with Broker [" + address
+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e);
}
} finally {
if (consumer != null) {
......@@ -333,18 +352,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
return returnMetaData;
}
private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) {
for (int i = 0; i < 3; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("Trying to find a new leader after Broker failure.");
}
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition);
PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
} else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
......@@ -362,4 +381,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
throw new RuntimeException("Unable to find new leader after Broker failure.");
}
public int getId() {
return this.partition;
}
@Override
public String toString() {
return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}";
}
}
......@@ -19,6 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
import kafka.javaapi.consumer.SimpleConsumer;
/**
* Offset given by a message read from Kafka.
*/
public class GivenOffset extends KafkaOffset {
private static final long serialVersionUID = 1L;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
......@@ -29,6 +30,9 @@ import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
/**
* Superclass for various kinds of KafkaOffsets.
*/
public abstract class KafkaOffset implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
......@@ -38,6 +42,15 @@ public abstract class KafkaOffset implements Serializable {
public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
String clientName);
/**
*
* @param consumer
* @param topic
* @param partition
* @param whichTime Type of offset request (latest time / earliest time)
* @param clientName
* @return
*/
protected long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
......@@ -49,22 +62,25 @@ public abstract class KafkaOffset implements Serializable {
OffsetResponse response = consumer.getOffsetsBefore(request);
while (response.hasError()) {
switch (response.errorCode(topic, partition)) {
int errorCode = response.errorCode(topic, partition);
LOG.warn("Response has error. Error code "+errorCode);
switch (errorCode) {
case 6:
case 3:
LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
break;
default:
throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
+ response.errorCode(topic, partition));
throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode);
}
request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
response = consumer.getOffsetsBefore(request);
}
long[] offsets = response.offsets(topic, partition);
if(offsets.length > 1) {
LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
}
return offsets[0];
}
......
......@@ -670,8 +670,7 @@ public class KafkaITCase {
createTestTopic(topic, 2, 2);
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
final String leaderToShutDown =
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
final Thread brokerShutdown = new Thread(new Runnable() {
@Override
......@@ -711,13 +710,14 @@ public class KafkaITCase {
consuming.addSink(new SinkFunction<String>() {
int elCnt = 0;
int start = 0;
int numOfMessagesToReceive = 100;
int numOfMessagesToBeCorrect = 100;
int stopAfterMessages = 150;
BitSet validator = new BitSet(numOfMessagesToReceive + 1);
BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
@Override
public void invoke(String value) throws Exception {
LOG.debug("Got " + value);
LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect);
String[] sp = value.split("-");
int v = Integer.parseInt(sp[1]);
......@@ -736,16 +736,15 @@ public class KafkaITCase {
shutdownKafkaBroker = true;
}
if (elCnt == numOfMessagesToReceive && leaderHasShutDown) {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
if (elCnt >= stopAfterMessages ) {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
}
throw new SuccessException();
}
throw new SuccessException();
} else if (elCnt == numOfMessagesToReceive) {
numOfMessagesToReceive += 50;
LOG.info("Waiting for more messages till {}", numOfMessagesToReceive);
}
}
});
......@@ -759,7 +758,9 @@ public class KafkaITCase {
LOG.info("Starting source.");
int cnt = 0;
while (running) {
collector.collect("kafka-" + cnt++);
String msg = "kafka-" + cnt++;
collector.collect(msg);
LOG.info("sending message = "+msg);
if ((cnt - 1) % 20 == 0) {
LOG.debug("Sending message #{}", cnt - 1);
......
......@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
log4j.rootLogger=OFF, testlogger
log4j.rootLogger=INFO, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册