From 811dd45e87600d7163c8f74b40d377831f64bbfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Mon, 25 Nov 2019 17:20:18 +0800 Subject: [PATCH] Revert "[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603)" (#5733) This reverts commit ee11e100d8a05296f1ddf0da6c4e52f63ca02294. --- .../client/PulsarBrokerStatsClientTest.java | 63 ------------------- .../apache/pulsar/client/impl/HttpClient.java | 6 +- .../pulsar/client/impl/PulsarClientImpl.java | 32 +--------- 3 files changed, 4 insertions(+), 97 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index 00d95d15384..fca14857f1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -31,8 +31,6 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.slf4j.Logger; @@ -49,7 +47,6 @@ import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { @@ -135,65 +132,5 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } - @Test - public void testGetPartitionedTopicMetaData() throws Exception { - log.info("-- Starting {} test --", methodName); - - final String topicName = "persistent://my-property/my-ns/my-topic1"; - final String subscriptionName = "my-subscriber-name"; - - - - try { - String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT; - if (isTcpLookup) { - url = "pulsar://localhost:51000,localhost:" + BROKER_PORT; - } - PulsarClient client = newPulsarClient(url, 0); - - Consumer consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); - Producer producer = client.newProducer().topic(topicName).create(); - - consumer.close(); - producer.close(); - client.close(); - } catch (PulsarClientException pce) { - log.error("create producer or consumer error: ", pce); - fail(); - } - - log.info("-- Exiting {} test --", methodName); - } - - @Test (timeOut = 4000) - public void testGetPartitionedTopicDataTimeout() { - log.info("-- Starting {} test --", methodName); - - final String topicName = "persistent://my-property/my-ns/my-topic1"; - - String url = "http://localhost:51000,localhost:51001"; - if (isTcpLookup) { - url = "pulsar://localhost:51000,localhost:51001"; - } - - PulsarClient client; - try { - client = PulsarClient.builder() - .serviceUrl(url) - .statsInterval(0, TimeUnit.SECONDS) - .operationTimeout(3, TimeUnit.SECONDS) - .build(); - - Producer producer = client.newProducer().topic(topicName).create(); - - fail(); - } catch (PulsarClientException pce) { - log.error("create producer error: ", pce); - } - - log.info("-- Exiting {} test --", methodName); - } - private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 845c741eb43..96c62d2e9b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl; import java.io.Closeable; import java.io.IOException; import java.net.HttpURLConnection; -import java.net.URI; import java.net.URL; import java.util.Map; import java.util.Map.Entry; @@ -128,9 +127,8 @@ public class HttpClient implements Closeable { public CompletableFuture get(String path, Class clazz) { final CompletableFuture future = new CompletableFuture<>(); try { - URI hostUri = serviceNameResolver.resolveHostUri(); - String requestUrl = new URL(hostUri.toURL(), path).toString(); - String remoteHostName = hostUri.getHost(); + String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString(); + String remoteHostName = serviceNameResolver.resolveHostUri().getHost(); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); CompletableFuture> authFuture = new CompletableFuture<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index abad54b2a0e..53468d3af3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -641,45 +641,17 @@ public class PulsarClientImpl implements PulsarClient { public CompletableFuture getPartitionedTopicMetadata(String topic) { - CompletableFuture metadataFuture = new CompletableFuture<>(); + CompletableFuture metadataFuture; try { TopicName topicName = TopicName.get(topic); - AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs()); - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) - .setMax(0, TimeUnit.MILLISECONDS) - .useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(), - conf.getMaxBackoffIntervalNanos()) - .create(); - getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture); + metadataFuture = lookup.getPartitionedTopicMetadata(topicName); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } return metadataFuture; } - private void getPartitionedTopicMetadata(TopicName topicName, - Backoff backoff, - AtomicLong remainingTime, - CompletableFuture future) { - lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { - long nextDelay = Math.min(backoff.next(), remainingTime.get()); - if (nextDelay <= 0) { - future.completeExceptionally(new PulsarClientException - .TimeoutException("Could not getPartitionedTopicMetadata within configured timeout.")); - return null; - } - - timer.newTimeout( task -> { - remainingTime.addAndGet(-nextDelay); - getPartitionedTopicMetadata(topicName, backoff, remainingTime, future); - }, nextDelay, TimeUnit.MILLISECONDS); - return null; - }); - } - @Override public CompletableFuture> getPartitionsForTopic(String topic) { return getPartitionedTopicMetadata(topic).thenApply(metadata -> { -- GitLab