提交 cb34e976 编写于 作者: G Gábor Hermann 提交者: Robert Metzger

[FLINK-1753] [streaming] Added Kafka broker failure test

This closes #589
上级 e5a3b95a
......@@ -1257,12 +1257,12 @@ Example:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
stream.addSink(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
stream.addSource(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
stream.addSink(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
stream.addSource(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
{% endhighlight %}
</div>
</div>
......@@ -1291,6 +1291,25 @@ stream.addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringS
</div>
</div>
The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema<IN, byte[]> serializationSchema)
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema serializationSchema)
{% endhighlight %}
</div>
</div>
If this constructor is used, the user needs to make sure to set the broker with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.
More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
[Back to top](#top)
......
......@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka.api;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.java.ClosureCleaner;
......@@ -27,6 +28,8 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
......@@ -45,8 +48,10 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
private Producer<IN, byte[]> producer;
private Properties props;
private Properties userDefinedProperties;
private String topicId;
private String zookeeperAddress;
private SerializationSchema<IN, byte[]> schema;
......@@ -54,8 +59,8 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
/**
* Creates a KafkaSink for a given topic. The partitioner distributes the
* messages between the partitions of the topics.
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
......@@ -64,27 +69,27 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* @param serializationSchema
* User defined serialization schema.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema) {
this(zookeeperAddress, topicId, serializationSchema, (Class) null);
this(zookeeperAddress, topicId, new Properties(), serializationSchema);
}
/**
* Creates a KafkaSink for a given topic. The sink produces its input into
* the topic.
* Creates a KafkaSink for a given topic with custom Producer configuration.
* If you use this constructor, the broker should be set with the "metadata.broker.list"
* configuration.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
* ID of the Kafka topic.
* @param producerConfig
* Configurations of the Kafka producer
* @param serializationSchema
* User defined serialization schema.
* @param partitioner
* User defined partitioner.
*/
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema<IN, byte[]> serializationSchema) {
NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
Preconditions.checkNotNull(topicId, "TopicID not set");
ClosureCleaner.ensureSerializable(partitioner);
......@@ -92,18 +97,32 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
this.schema = serializationSchema;
this.partitionerClass = null;
this.userDefinedProperties = producerConfig;
}
/**
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
* @param partitioner
* User defined partitioner.
*/
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
this(zookeeperAddress, topicId, serializationSchema);
this.partitioner = partitioner;
}
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
Preconditions.checkNotNull(topicId, "TopicID not set");
ClosureCleaner.ensureSerializable(partitioner);
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
this.schema = serializationSchema;
this(zookeeperAddress, topicId, serializationSchema);
this.partitionerClass = partitioner;
}
......@@ -114,33 +133,42 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
public void open(Configuration configuration) {
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);
props = new Properties();
if (LOG.isInfoEnabled()) {
LOG.info("Broker list: {}", listOfBrokers);
}
Properties properties = new Properties();
props.put("metadata.broker.list", brokerAddress);
props.put("request.required.acks", "1");
properties.put("metadata.broker.list", listOfBrokers);
properties.put("request.required.acks", "-1");
properties.put("message.send.max.retries", "10");
props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
// this will not be used as the key will not be serialized
props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
}
if (partitioner != null) {
props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
// java serialization will do the rest.
props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
}
if (partitionerClass != null) {
props.put("partitioner.class", partitionerClass);
properties.put("partitioner.class", partitionerClass);
}
ProducerConfig config = new ProducerConfig(props);
ProducerConfig config = new ProducerConfig(properties);
try {
producer = new Producer<IN, byte[]>(config);
} catch (NullPointerException e) {
throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddress, e);
throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e);
}
}
......
......@@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.kafka.api.simple;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
......@@ -31,6 +34,8 @@ import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.api.TopicMetadata;
import kafka.cluster.Broker;
import kafka.common.LeaderNotAvailableException;
import kafka.common.UnknownTopicOrPartitionException;
import scala.collection.JavaConversions;
import scala.collection.Seq;
......@@ -42,22 +47,29 @@ public class KafkaTopicUtils {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
private final ZkClient zkClient;
private ZkClient zkClient;
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
private final String zookeeperAddress;
private final int sessionTimeoutMs;
private final int connectionTimeoutMs;
private volatile boolean isRunning = false;
public KafkaTopicUtils(String zookeeperServer) {
this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
}
public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) {
zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
new KafkaZKStringSerializer());
zkClient.waitUntilConnected();
this.zookeeperAddress = zookeeperAddress;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
}
public void createTopic(String topicName, int numOfPartitions, int replicationFactor) {
LOG.info("Creating Kafka topic '{}'", topicName);
Properties topicConfig = new Properties();
if (topicExists(topicName)) {
......@@ -65,36 +77,150 @@ public class KafkaTopicUtils {
LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
}
} else {
LOG.info("Connecting zookeeper");
initZkClient();
AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
closeZkClient();
}
}
public String getBrokerList(String topicName) {
return getBrokerAddressList(getBrokerAddresses(topicName));
}
public String getBrokerList(String topicName, int partitionId) {
return getBrokerAddressList(getBrokerAddresses(topicName, partitionId));
}
public Set<String> getBrokerAddresses(String topicName) {
int numOfPartitions = getNumberOfPartitions(topicName);
HashSet<String> brokers = new HashSet<String>();
for (int i = 0; i < numOfPartitions; i++) {
brokers.addAll(getBrokerAddresses(topicName, i));
}
return brokers;
}
public Set<String> getBrokerAddresses(String topicName, int partitionId) {
PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId);
Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr());
HashSet<String> addresses = new HashSet<String>();
for (Broker broker : inSyncReplicas) {
addresses.add(broker.connectionString());
}
return addresses;
}
private static String getBrokerAddressList(Set<String> brokerAddresses) {
StringBuilder brokerAddressList = new StringBuilder("");
for (String broker : brokerAddresses) {
brokerAddressList.append(broker);
brokerAddressList.append(',');
}
brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
return brokerAddressList.toString();
}
public int getNumberOfPartitions(String topicName) {
Seq<PartitionMetadata> partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata();
Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata();
return JavaConversions.asJavaCollection(partitionMetadataSeq).size();
}
public String getLeaderBrokerAddressForTopic(String topicName) {
TopicMetadata topicInfo = getTopicInfo(topicName);
public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) {
isRunning = true;
PartitionMetadata partitionMetadata = null;
while (isRunning) {
try {
partitionMetadata = getPartitionMetadata(topicName, partitionId);
return partitionMetadata;
} catch (LeaderNotAvailableException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got {} trying to fetch metadata again", e.getMessage());
}
}
}
isRunning = false;
return partitionMetadata;
}
public PartitionMetadata getPartitionMetadata(String topicName, int partitionId) {
PartitionMetadata partitionMetadata = getPartitionMetadataWithErrorCode(topicName, partitionId);
switch (partitionMetadata.errorCode()) {
case 0:
return partitionMetadata;
case 3:
throw new UnknownTopicOrPartitionException("While fetching metadata for " + topicName + " / " + partitionId);
case 5:
throw new LeaderNotAvailableException("While fetching metadata for " + topicName + " / " + partitionId);
default:
throw new RuntimeException("Unknown error occurred while fetching metadata for "
+ topicName + " / " + partitionId + ", with error code: " + partitionMetadata.errorCode());
}
}
private PartitionMetadata getPartitionMetadataWithErrorCode(String topicName, int partitionId) {
TopicMetadata topicInfo = getTopicMetadata(topicName);
Collection<PartitionMetadata> partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata());
PartitionMetadata partitionMetadata = partitions.iterator().next();
Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
Iterator<PartitionMetadata> iterator = partitions.iterator();
for (PartitionMetadata partition : partitions) {
if (partition.partitionId() == partitionId) {
return partition;
}
}
return leader.connectionString();
throw new RuntimeException("No such partition: " + topicName + " / " + partitionId);
}
public TopicMetadata getTopicInfo(String topicName) {
public TopicMetadata getTopicMetadata(String topicName) {
TopicMetadata topicMetadata = getTopicMetadataWithErrorCode(topicName);
switch (topicMetadata.errorCode()) {
case 0:
return topicMetadata;
case 3:
throw new UnknownTopicOrPartitionException("While fetching metadata for topic " + topicName);
case 5:
throw new LeaderNotAvailableException("While fetching metadata for topic " + topicName);
default:
throw new RuntimeException("Unknown error occurred while fetching metadata for topic "
+ topicName + ", with error code: " + topicMetadata.errorCode());
}
}
private TopicMetadata getTopicMetadataWithErrorCode(String topicName) {
if (topicExists(topicName)) {
return AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
initZkClient();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
closeZkClient();
return topicMetadata;
} else {
throw new RuntimeException("Topic does not exist: " + topicName);
}
}
public boolean topicExists(String topicName) {
return AdminUtils.topicExists(zkClient, topicName);
initZkClient();
boolean topicExists = AdminUtils.topicExists(zkClient, topicName);
closeZkClient();
return topicExists;
}
private void initZkClient() {
zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
new KafkaZKStringSerializer());
zkClient.waitUntilConnected();
}
private void closeZkClient() {
zkClient.close();
zkClient = null;
}
private static class KafkaZKStringSerializer implements ZkSerializer {
......
......@@ -167,8 +167,6 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
if (indexOfSubtask >= numberOfPartitions) {
iterator = new KafkaIdleConsumerIterator();
} else {
......@@ -188,7 +186,7 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
context.registerState("kafka", kafkaOffSet);
}
iterator = getMultiKafkaIterator(brokerAddress, topicId, partitions, waitOnEmptyFetchMillis);
iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize);
if (LOG.isInfoEnabled()) {
LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.",
......@@ -199,10 +197,6 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
iterator.initialize();
}
protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch, this.connectTimeoutMs, this.bufferSize);
}
@Override
public void run(Collector<OUT> collector) throws Exception {
isRunning = true;
......
......@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
......@@ -33,25 +34,22 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
protected List<KafkaSinglePartitionIterator> partitions;
protected final int waitOnEmptyFetch;
public KafkaMultiplePartitionsIterator(String hostName, String topic,
public KafkaMultiplePartitionsIterator(String topic,
Map<Integer, KafkaOffset> partitionsWithOffset,
KafkaTopicUtils kafkaTopicUtils,
int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
String[] hostAndPort = hostName.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
this.waitOnEmptyFetch = waitOnEmptyFetch;
for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
partitions.add(new KafkaSinglePartitionIterator(
host,
port,
topic,
partitionWithOffset.getKey(),
partitionWithOffset.getValue(), connectTimeoutMs, bufferSize));
partitionWithOffset.getValue(),
kafkaTopicUtils,
connectTimeoutMs,
bufferSize));
}
}
......
......@@ -19,11 +19,14 @@ package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
......@@ -32,7 +35,9 @@ import org.slf4j.LoggerFactory;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.NotLeaderForPartitionException;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
......@@ -53,34 +58,43 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private List<String> hosts;
private String topic;
private int port;
private int partition;
private long readOffset;
private transient SimpleConsumer consumer;
private List<String> replicaBrokers;
private String clientName;
private String leadBroker;
private Broker leadBroker;
private final int connectTimeoutMs;
private final int bufferSize;
private KafkaOffset initialOffset;
private transient Iterator<MessageAndOffset> iter;
private transient FetchResponse fetchResponse;
private volatile boolean isRunning;
/**
* Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
* we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
*
* @param hostName Hostname of a known Kafka broker
* @param port Port of the known Kafka broker
* @param topic Name of the topic to listen to
* @param partition Partition in the chosen topic
* @param topic
* Name of the topic to listen to
* @param partition
* Partition in the chosen topic
* @param initialOffset
* Offset to start consuming at
* @param kafkaTopicUtils
* Util for receiving topic metadata
* @param connectTimeoutMs
* Connection timeout in milliseconds
* @param bufferSize
* Size of buffer
*/
public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset,
int connectTimeoutMs, int bufferSize) {
this.hosts = new ArrayList<String>();
hosts.add(hostName);
this.port = port;
public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
KafkaTopicUtils kafkaTopicUtils, int connectTimeoutMs, int bufferSize) {
Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
this.hosts = new ArrayList<String>(brokerAddresses);
this.connectTimeoutMs = connectTimeoutMs;
this.bufferSize = bufferSize;
this.topic = topic;
......@@ -88,7 +102,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
this.initialOffset = initialOffset;
replicaBrokers = new ArrayList<String>();
this.replicaBrokers = new ArrayList<String>();
}
// --------------------------------------------------------------------------------------------
......@@ -100,29 +114,55 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
* the topic and establishing a connection to it.
*/
public void initialize() throws InterruptedException {
if (LOG.isInfoEnabled()) {
LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
}
PartitionMetadata metadata;
isRunning = true;
do {
metadata = findLeader(hosts, port, topic, partition);
metadata = findLeader(hosts, topic, partition);
try {
Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
} catch (InterruptedException e) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
} while (metadata == null);
} while (isRunning && metadata == null);
isRunning = false;
if (metadata.leader() == null) {
throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
+ ":" + port);
throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
}
leadBroker = metadata.leader().host();
leadBroker = metadata.leader();
clientName = "Client_" + topic + "_" + partition;
consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName);
consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName);
try {
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
} catch (NotLeaderForPartitionException e) {
do {
metadata = findLeader(hosts, topic, partition);
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
try {
Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
} catch (InterruptedException ie) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
} while (metadata == null);
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
}
resetFetchResponse(readOffset);
try {
resetFetchResponse(readOffset);
} catch (ClosedChannelException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Got ClosedChannelException, trying to find new leader.");
}
findNewLeader();
}
}
/**
......@@ -161,7 +201,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
public boolean fetchHasNext() throws InterruptedException {
synchronized (fetchResponse) {
if (!iter.hasNext()) {
resetFetchResponse(readOffset);
try {
resetFetchResponse(readOffset);
} catch (ClosedChannelException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Got ClosedChannelException, trying to find new leader.");
}
findNewLeader();
}
return iter.hasNext();
} else {
return true;
......@@ -205,7 +252,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
// Internal utilities
// --------------------------------------------------------------------------------------------
private void resetFetchResponse(long offset) throws InterruptedException {
private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
......@@ -225,24 +272,43 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition, port);
findNewLeader();
}
iter = fetchResponse.messageSet(topic, partition).iterator();
}
private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
private void findNewLeader() throws InterruptedException {
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition);
consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), 100000, 64 * 1024, clientName);
}
@SuppressWarnings("ConstantConditions")
private PartitionMetadata findLeader(List<String> addresses, String a_topic,
int a_partition) {
PartitionMetadata returnMetaData = null;
loop:
for (String seed : a_hosts) {
for (String address : addresses) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to find leader via broker: {}", address);
}
String[] split = address.split(":");
String host = split[0];
int port = Integer.parseInt(split[1]);
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, connectTimeoutMs, bufferSize, "leaderLookup");
consumer = new SimpleConsumer(host, port, connectTimeoutMs, bufferSize, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
......@@ -255,8 +321,13 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
}
}
} catch (Exception e) {
throw new RuntimeException("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
if (e instanceof ClosedChannelException) {
LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition);
} else {
throw new RuntimeException("Error communicating with Broker [" + address
+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
}
} finally {
if (consumer != null) {
consumer.close();
......@@ -266,30 +337,31 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
if (returnMetaData != null) {
replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
replicaBrokers.add(replica.host());
replicaBrokers.add(replica.host() + ":" + replica.port());
}
}
return returnMetaData;
}
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException {
@SuppressWarnings({"ConstantConditions", "UnusedAssignment"})
private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
for (int i = 0; i < 3; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("Trying to find a new leader after Broker failure.");
}
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition);
PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
} else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
return metadata.leader();
}
if (goToSleep) {
try {
......
......@@ -21,6 +21,9 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
......@@ -28,6 +31,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
public abstract class KafkaOffset implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
private static final long serialVersionUID = 1L;
public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
......@@ -38,14 +43,27 @@ public abstract class KafkaOffset implements Serializable {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
+ response.errorCode(topic, partition));
while (response.hasError()) {
switch (response.errorCode(topic, partition)) {
case 6:
case 3:
LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
break;
default:
throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
+ response.errorCode(topic, partition));
}
request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
response = consumer.getOffsetsBefore(request);
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
......
......@@ -24,7 +24,9 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.SerializationUtils;
......@@ -43,6 +45,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
......@@ -58,7 +61,6 @@ import org.slf4j.LoggerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
/**
* Code in this test is based on the following GitHub repository:
......@@ -70,9 +72,9 @@ import kafka.utils.Time;
public class KafkaITCase {
private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
private static final int NUMBER_OF_KAFKA_SERVERS = 3;
private static int zkPort;
private static int kafkaPort;
private static String kafkaHost;
private static String zookeeperConnectionString;
......@@ -80,30 +82,39 @@ public class KafkaITCase {
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
public static File tmpZkDir;
public static File tmpKafkaDir;
public static List<File> tmpKafkaDirs;
private static TestingServer zookeeper;
private static KafkaServer broker1;
private static List<KafkaServer> brokers;
private static boolean shutdownKafkaBroker;
@BeforeClass
public static void prepare() throws IOException {
LOG.info("Starting KafkaITCase.prepare()");
tmpZkDir = tempFolder.newFolder();
tmpKafkaDir = tempFolder.newFolder();
tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
tmpKafkaDirs.add(tempFolder.newFolder());
}
kafkaHost = InetAddress.getLocalHost().getHostName();
zkPort = NetUtils.getAvailablePort();
kafkaPort = NetUtils.getAvailablePort();
zookeeperConnectionString = "localhost:" + zkPort;
zookeeper = null;
broker1 = null;
brokers = null;
try {
LOG.info("Starting Zookeeper");
zookeeper = getZookeeper();
LOG.info("Starting KafkaServer");
broker1 = getKafkaServer(0);
brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
}
LOG.info("ZK and KafkaServer started.");
} catch (Throwable t) {
LOG.warn("Test failed with exception", t);
......@@ -114,8 +125,10 @@ public class KafkaITCase {
@AfterClass
public static void shutDownServices() {
LOG.info("Shutting down all services");
if (broker1 != null) {
broker1.shutdown();
for (KafkaServer broker : brokers) {
if (broker != null) {
broker.shutdown();
}
}
if (zookeeper != null) {
try {
......@@ -131,7 +144,7 @@ public class KafkaITCase {
LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
String topic = "regularKafkaSourceTestTopic";
createTestTopic(topic, 1);
createTestTopic(topic, 1, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
......@@ -145,7 +158,7 @@ public class KafkaITCase {
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
LOG.info("Got " + value);
LOG.debug("Got " + value);
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
......@@ -217,7 +230,7 @@ public class KafkaITCase {
LOG.info("Starting KafkaITCase.tupleTestTopology()");
String topic = "tupleTestTopic";
createTestTopic(topic, 1);
createTestTopic(topic, 1, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
......@@ -231,7 +244,7 @@ public class KafkaITCase {
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
LOG.info("Got " + value);
LOG.debug("Got " + value);
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
......@@ -305,8 +318,8 @@ public class KafkaITCase {
LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
String topic = "customPartitioningTestTopic";
createTestTopic(topic, 3);
createTestTopic(topic, 3, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
......@@ -323,7 +336,7 @@ public class KafkaITCase {
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
LOG.info("Got " + value);
LOG.debug("Got " + value);
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
......@@ -412,6 +425,7 @@ public class KafkaITCase {
public int partition(Object key, int numPartitions) {
partitionerHasBeenCalled = true;
@SuppressWarnings("unchecked")
Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
if (tuple.f0 < 10) {
return 0;
......@@ -441,14 +455,13 @@ public class KafkaITCase {
public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
return false;
}
}
@Test
public void simpleTestTopology() throws Exception {
String topic = "simpleTestTopic";
createTestTopic(topic, 1);
createTestTopic(topic, 1, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
......@@ -462,7 +475,7 @@ public class KafkaITCase {
@Override
public void invoke(String value) throws Exception {
LOG.info("Got " + value);
LOG.debug("Got " + value);
String[] sp = value.split("-");
int v = Integer.parseInt(sp[1]);
if (start == -1) {
......@@ -524,13 +537,149 @@ public class KafkaITCase {
}
}
private static boolean leaderHasShutDown = false;
@Test
public void brokerFailureTest() throws Exception {
String topic = "brokerFailureTestTopic";
createTestTopic(topic, 2, 2);
private void createTestTopic(String topic, int numberOfPartitions) {
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
final String leaderToShutDown =
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
final Thread brokerShutdown = new Thread(new Runnable() {
@Override
public void run() {
shutdownKafkaBroker = false;
while (!shutdownKafkaBroker) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interruption", e);
}
}
for (KafkaServer kafkaServer : brokers) {
if (leaderToShutDown.equals(
kafkaServer.config().advertisedHostName()
+ ":"
+ kafkaServer.config().advertisedPort()
)) {
LOG.info("Killing Kafka Server {}", leaderToShutDown);
kafkaServer.shutdown();
leaderHasShutDown = true;
break;
}
}
}
});
brokerShutdown.start();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
// add consuming topology:
DataStreamSource<String> consuming = env.addSource(
new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING));
consuming.setParallelism(1);
consuming.addSink(new SinkFunction<String>() {
int elCnt = 0;
int start = 0;
int numOfMessagesToReceive = 100;
BitSet validator = new BitSet(numOfMessagesToReceive + 1);
@Override
public void invoke(String value) throws Exception {
LOG.debug("Got " + value);
String[] sp = value.split("-");
int v = Integer.parseInt(sp[1]);
if (start == -1) {
start = v;
}
Assert.assertFalse("Received tuple twice", validator.get(v - start));
if (v - start < 0 && LOG.isWarnEnabled()) {
LOG.warn("Not in order: {}", value);
}
validator.set(v - start);
elCnt++;
if (elCnt == 20) {
// shut down a Kafka broker
shutdownKafkaBroker = true;
}
if (elCnt == numOfMessagesToReceive && leaderHasShutDown) {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
// throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
System.out.println("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
}
throw new SuccessException();
} else if (elCnt == numOfMessagesToReceive) {
numOfMessagesToReceive += 50;
LOG.info("Waiting for more messages till {}", numOfMessagesToReceive);
}
}
});
// add producing topology
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
boolean running = true;
@Override
public void run(Collector<String> collector) throws Exception {
LOG.info("Starting source.");
int cnt = 0;
while (running) {
collector.collect("kafka-" + cnt++);
if ((cnt - 1) % 20 == 0) {
LOG.debug("Sending message #{}", cnt - 1);
}
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
@Override
public void cancel() {
LOG.info("Source got chancel()");
running = false;
}
});
stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()))
.setParallelism(1);
try {
env.setParallelism(1);
env.execute();
} catch (JobExecutionException good) {
Throwable t = good.getCause();
int limit = 0;
while (!(t instanceof SuccessException)) {
t = t.getCause();
if (limit++ == 20) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
}
}
}
}
private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor);
}
private static TestingServer getZookeeper() throws Exception {
return new TestingServer(zkPort, tmpZkDir);
}
......@@ -538,42 +687,24 @@ public class KafkaITCase {
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
*/
private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
Properties kafkaProperties = new Properties();
int kafkaPort = NetUtils.getAvailablePort();
// properties have to be Strings
kafkaProperties.put("advertised.host.name", kafkaHost);
kafkaProperties.put("port", Integer.toString(kafkaPort));
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpKafkaDir.toString());
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
KafkaServer server = new KafkaServer(kafkaConfig, new LocalSystemTime());
KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
server.startup();
return server;
}
public static class LocalSystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
LOG.warn("Interruption", e);
}
}
}
public static class SuccessException extends Exception {
private static final long serialVersionUID = 1L;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.test.TestingServer;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.api.PartitionMetadata;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
public class KafkaTopicUtilsTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
private static final int NUMBER_OF_BROKERS = 2;
private static final String TOPIC = "myTopic";
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void test() {
int zkPort;
String kafkaHost;
String zookeeperConnectionString;
File tmpZkDir;
List<File> tmpKafkaDirs;
Map<String, KafkaServer> kafkaServers = null;
TestingServer zookeeper = null;
try {
tmpZkDir = tempFolder.newFolder();
tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
tmpKafkaDirs.add(tempFolder.newFolder());
}
zkPort = NetUtils.getAvailablePort();
kafkaHost = InetAddress.getLocalHost().getHostName();
zookeeperConnectionString = "localhost:" + zkPort;
// init zookeeper
zookeeper = new TestingServer(zkPort, tmpZkDir);
// init kafka kafkaServers
kafkaServers = new HashMap<String, KafkaServer>();
for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer);
}
// create Kafka topic
final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
kafkaTopicUtils.createTopic(TOPIC, 1, 2);
// check whether topic exists
assertTrue(kafkaTopicUtils.topicExists(TOPIC));
// check number of partitions
assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC));
// get partition metadata without error
PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
assertEquals(0, partitionMetadata.errorCode());
// get broker list
assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC));
} catch (IOException e) {
fail(e.toString());
} catch (Exception e) {
fail(e.toString());
} finally {
LOG.info("Shutting down all services");
for (KafkaServer broker : kafkaServers.values()) {
if (broker != null) {
broker.shutdown();
}
}
if (zookeeper != null) {
try {
zookeeper.stop();
} catch (IOException e) {
LOG.warn("ZK.stop() failed", e);
}
}
}
}
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
*/
private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException {
Properties kafkaProperties = new Properties();
int kafkaPort = NetUtils.getAvailablePort();
// properties have to be Strings
kafkaProperties.put("advertised.host.name", kafkaHost);
kafkaProperties.put("port", Integer.toString(kafkaPort));
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
server.startup();
return server;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.utils.Time;
public class KafkaLocalSystemTime implements Time {
private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
LOG.warn("Interruption", e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册