From ee11e100d8a05296f1ddf0da6c4e52f63ca02294 Mon Sep 17 00:00:00 2001 From: ltamber Date: Thu, 21 Nov 2019 00:53:20 +0800 Subject: [PATCH] [Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603) ### Motivation Fixes #5597 When using a multi-broker service url to create a producer, if the connection to the first broker failed, the creation will fail. ### Modification Add backoff retries when getting partitioned metadata from brokers. --- .../client/PulsarBrokerStatsClientTest.java | 63 +++++++++++++++++++ .../apache/pulsar/client/impl/HttpClient.java | 6 +- .../pulsar/client/impl/PulsarClientImpl.java | 32 +++++++++- 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index fca14857f1b..00d95d15384 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.slf4j.Logger; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { @@ -132,5 +135,65 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @Test + public void testGetPartitionedTopicMetaData() throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topicName = "persistent://my-property/my-ns/my-topic1"; + final String subscriptionName = "my-subscriber-name"; + + + + try { + String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT; + if (isTcpLookup) { + url = "pulsar://localhost:51000,localhost:" + BROKER_PORT; + } + PulsarClient client = newPulsarClient(url, 0); + + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + Producer producer = client.newProducer().topic(topicName).create(); + + consumer.close(); + producer.close(); + client.close(); + } catch (PulsarClientException pce) { + log.error("create producer or consumer error: ", pce); + fail(); + } + + log.info("-- Exiting {} test --", methodName); + } + + @Test (timeOut = 4000) + public void testGetPartitionedTopicDataTimeout() { + log.info("-- Starting {} test --", methodName); + + final String topicName = "persistent://my-property/my-ns/my-topic1"; + + String url = "http://localhost:51000,localhost:51001"; + if (isTcpLookup) { + url = "pulsar://localhost:51000,localhost:51001"; + } + + PulsarClient client; + try { + client = PulsarClient.builder() + .serviceUrl(url) + .statsInterval(0, TimeUnit.SECONDS) + .operationTimeout(3, TimeUnit.SECONDS) + .build(); + + Producer producer = client.newProducer().topic(topicName).create(); + + fail(); + } catch (PulsarClientException pce) { + log.error("create producer error: ", pce); + } + + log.info("-- Exiting {} test --", methodName); + } + private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 96c62d2e9b6..845c741eb43 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import java.io.Closeable; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; import java.util.Map; import java.util.Map.Entry; @@ -127,8 +128,9 @@ public class HttpClient implements Closeable { public CompletableFuture get(String path, Class clazz) { final CompletableFuture future = new CompletableFuture<>(); try { - String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString(); - String remoteHostName = serviceNameResolver.resolveHostUri().getHost(); + URI hostUri = serviceNameResolver.resolveHostUri(); + String requestUrl = new URL(hostUri.toURL(), path).toString(); + String remoteHostName = hostUri.getHost(); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); CompletableFuture> authFuture = new CompletableFuture<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 53468d3af3d..abad54b2a0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -641,17 +641,45 @@ public class PulsarClientImpl implements PulsarClient { public CompletableFuture getPartitionedTopicMetadata(String topic) { - CompletableFuture metadataFuture; + CompletableFuture metadataFuture = new CompletableFuture<>(); try { TopicName topicName = TopicName.get(topic); - metadataFuture = lookup.getPartitionedTopicMetadata(topicName); + AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs()); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMax(0, TimeUnit.MILLISECONDS) + .useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(), + conf.getMaxBackoffIntervalNanos()) + .create(); + getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } return metadataFuture; } + private void getPartitionedTopicMetadata(TopicName topicName, + Backoff backoff, + AtomicLong remainingTime, + CompletableFuture future) { + lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { + long nextDelay = Math.min(backoff.next(), remainingTime.get()); + if (nextDelay <= 0) { + future.completeExceptionally(new PulsarClientException + .TimeoutException("Could not getPartitionedTopicMetadata within configured timeout.")); + return null; + } + + timer.newTimeout( task -> { + remainingTime.addAndGet(-nextDelay); + getPartitionedTopicMetadata(topicName, backoff, remainingTime, future); + }, nextDelay, TimeUnit.MILLISECONDS); + return null; + }); + } + @Override public CompletableFuture> getPartitionsForTopic(String topic) { return getPartitionedTopicMetadata(topic).thenApply(metadata -> { -- GitLab