From f88ea9df9cdac2efb3511ff82a69593f08d69e3a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 25 Jul 2019 18:44:53 -0700 Subject: [PATCH] Reuse ManagedLedgerFactory instances across SQL queries (#4813) --- .../pulsar/sql/presto/PulsarConnector.java | 5 -- .../sql/presto/PulsarConnectorCache.java | 5 +- .../pulsar/sql/presto/PulsarSplitManager.java | 89 +++++++------------ .../sql/presto/TestPulsarConnector.java | 3 +- 4 files changed, 36 insertions(+), 66 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java index 498583d3c63..1d89b519c2e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java @@ -86,11 +86,6 @@ public class PulsarConnector implements Connector { } catch (Exception e) { log.error(e, "Failed to close pulsar connector"); } - try { - PulsarConnectorCache.shutdown(); - } catch (Exception e) { - log.error("Failed to shutdown pulsar connector cache"); - } try { lifeCycleManager.stop(); } catch (Exception e) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 36ba1d2b0df..54789842b57 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -39,11 +39,14 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; + public class PulsarConnectorCache { private static final Logger log = Logger.get(PulsarConnectorCache.class); - private static PulsarConnectorCache instance; + @VisibleForTesting + static PulsarConnectorCache instance; private final ManagedLedgerFactory managedLedgerFactory; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index bb1dea22377..77b8f11831e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -133,16 +133,6 @@ public class PulsarSplitManager implements ConnectorSplitManager { return new FixedSplitSource(splits); } - @VisibleForTesting - ManagedLedgerFactory getManagedLedgerFactory() throws Exception { - ClientConfiguration bkClientConfiguration = new ClientConfiguration() - .setZkServers(this.pulsarConnectorConfig.getZookeeperUri()) - .setClientTcpNoDelay(false) - .setStickyReadsEnabled(false) - .setUseV2WireProtocol(true); - return new ManagedLedgerFactoryImpl(bkClientConfiguration); - } - @VisibleForTesting Collection getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain) throws Exception { @@ -165,59 +155,40 @@ public class PulsarSplitManager implements ConnectorSplitManager { int splitRemainder = actualNumSplits % numPartitions; - ManagedLedgerFactory managedLedgerFactory = getManagedLedgerFactory(); - - try { - List splits = new LinkedList<>(); - for (int i = 0; i < numPartitions; i++) { - - int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition; - splits.addAll( - getSplitsForTopic( - topicName.getPartition(i).getPersistenceNamingEncoding(), - managedLedgerFactory, - splitsForThisPartition, - tableHandle, - schemaInfo, - topicName.getPartition(i).getLocalName(), - tupleDomain) - ); - } - return splits; - } finally { - if (managedLedgerFactory != null) { - try { - managedLedgerFactory.shutdown(); - } catch (Exception e) { - log.error(e); - } - } + ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig) + .getManagedLedgerFactory(); + + List splits = new LinkedList<>(); + for (int i = 0; i < numPartitions; i++) { + + int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition; + splits.addAll( + getSplitsForTopic( + topicName.getPartition(i).getPersistenceNamingEncoding(), + managedLedgerFactory, + splitsForThisPartition, + tableHandle, + schemaInfo, + topicName.getPartition(i).getLocalName(), + tupleDomain)); } + return splits; } @VisibleForTesting - Collection getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle - tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain) throws Exception { - ManagedLedgerFactory managedLedgerFactory = null; - try { - managedLedgerFactory = getManagedLedgerFactory(); - - return getSplitsForTopic( - topicName.getPersistenceNamingEncoding(), - managedLedgerFactory, - numSplits, - tableHandle, - schemaInfo, - tableHandle.getTableName(), tupleDomain); - } finally { - if (managedLedgerFactory != null) { - try { - managedLedgerFactory.shutdown(); - } catch (Exception e) { - log.error(e); - } - } - } + Collection getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, + PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain) + throws Exception { + ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig) + .getManagedLedgerFactory(); + + return getSplitsForTopic( + topicName.getPersistenceNamingEncoding(), + managedLedgerFactory, + numSplits, + tableHandle, + schemaInfo, + tableHandle.getTableName(), tupleDomain); } @VisibleForTesting diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 77213240da2..84228d56d1a 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -920,7 +920,8 @@ public abstract class TestPulsarConnector { } }); - doReturn(managedLedgerFactory).when(this.pulsarSplitManager).getManagedLedgerFactory(); + PulsarConnectorCache.instance = mock(PulsarConnectorCache.class); + when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory); for (Map.Entry split : splits.entrySet()) { -- GitLab