提交 be96bd38 编写于 作者: R rdhabalia 提交者: Matteo Merli

Fix: failed producer creation leak (#927)

上级 89dda02f
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atLeastOnce;
......@@ -33,11 +34,11 @@ import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import static java.util.UUID.randomUUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
......@@ -60,16 +61,12 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.HandlerBase.State;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
......@@ -737,4 +734,37 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
}
}
@Test
public void testCleanProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
ProducerConfiguration producerConf = new ProducerConfiguration();
admin.clusters().createCluster("global", new ClusterData());
admin.namespaces().createNamespace("my-property/global/lookup");
ClientConfiguration clientConf = new ClientConfiguration();
final int operationTimeOut = 500;
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setOperationTimeout(operationTimeOut, TimeUnit.MILLISECONDS);
PulsarClient pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
CountDownLatch latch = new CountDownLatch(1);
pulsarClient.createProducerAsync("persistent://my-property/global/lookup/my-topic1", producerConf)
.handle((producer, e) -> {
latch.countDown();
return null;
});
latch.await(operationTimeOut+1000, TimeUnit.MILLISECONDS);
Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
prodField.setAccessible(true);
@SuppressWarnings("unchecked")
IdentityHashMap<ProducerBase, Boolean> producers = (IdentityHashMap<ProducerBase, Boolean>) prodField
.get(pulsarClient);
assertTrue(producers.isEmpty());
pulsarClient.close();
log.info("-- Exiting {} test --", methodName);
}
}
......@@ -654,6 +654,7 @@ public class ConsumerImpl extends ConsumerBase {
void connectionFailed(PulsarClientException exception) {
if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Consumer creation failed for consumer {}", topic, consumerId);
client.cleanupConsumer(this);
}
}
......
......@@ -899,6 +899,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
&& producerCreatedFuture.completeExceptionally(exception)) {
log.info("[{}] Producer creation failed for producer {}", topic, producerId);
setState(State.Failed);
client.cleanupProducer(this);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册