提交 ee11e100 编写于 作者: L ltamber 提交者: Sijie Guo

[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603)

### Motivation

Fixes #5597 

When using a multi-broker service url to create a producer, if the connection to the first broker failed, the creation will fail.

### Modification

Add backoff retries when getting partitioned metadata from brokers.
上级 59cf1677
...@@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.Consumer; ...@@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; ...@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
...@@ -132,5 +135,65 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { ...@@ -132,5 +135,65 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName); log.info("-- Exiting {} test --", methodName);
} }
@Test
public void testGetPartitionedTopicMetaData() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name";
try {
String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT;
if (isTcpLookup) {
url = "pulsar://localhost:51000,localhost:" + BROKER_PORT;
}
PulsarClient client = newPulsarClient(url, 0);
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = client.newProducer().topic(topicName).create();
consumer.close();
producer.close();
client.close();
} catch (PulsarClientException pce) {
log.error("create producer or consumer error: ", pce);
fail();
}
log.info("-- Exiting {} test --", methodName);
}
@Test (timeOut = 4000)
public void testGetPartitionedTopicDataTimeout() {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/my-topic1";
String url = "http://localhost:51000,localhost:51001";
if (isTcpLookup) {
url = "pulsar://localhost:51000,localhost:51001";
}
PulsarClient client;
try {
client = PulsarClient.builder()
.serviceUrl(url)
.statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(3, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer = client.newProducer().topic(topicName).create();
fail();
} catch (PulsarClientException pce) {
log.error("create producer error: ", pce);
}
log.info("-- Exiting {} test --", methodName);
}
private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class); private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
} }
...@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; ...@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
...@@ -127,8 +128,9 @@ public class HttpClient implements Closeable { ...@@ -127,8 +128,9 @@ public class HttpClient implements Closeable {
public <T> CompletableFuture<T> get(String path, Class<T> clazz) { public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
final CompletableFuture<T> future = new CompletableFuture<>(); final CompletableFuture<T> future = new CompletableFuture<>();
try { try {
String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString(); URI hostUri = serviceNameResolver.resolveHostUri();
String remoteHostName = serviceNameResolver.resolveHostUri().getHost(); String requestUrl = new URL(hostUri.toURL(), path).toString();
String remoteHostName = hostUri.getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>(); CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
......
...@@ -641,17 +641,45 @@ public class PulsarClientImpl implements PulsarClient { ...@@ -641,17 +641,45 @@ public class PulsarClientImpl implements PulsarClient {
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) { public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture; CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try { try {
TopicName topicName = TopicName.get(topic); TopicName topicName = TopicName.get(topic);
metadataFuture = lookup.getPartitionedTopicMetadata(topicName); AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(),
conf.getMaxBackoffIntervalNanos())
.create();
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
} }
return metadataFuture; return metadataFuture;
} }
private void getPartitionedTopicMetadata(TopicName topicName,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture<PartitionedTopicMetadata> future) {
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(new PulsarClientException
.TimeoutException("Could not getPartitionedTopicMetadata within configured timeout."));
return null;
}
timer.newTimeout( task -> {
remainingTime.addAndGet(-nextDelay);
getPartitionedTopicMetadata(topicName, backoff, remainingTime, future);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}
@Override @Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) { public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> { return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册