From c8b257397f989dc5fef2344b71d01fd4a228a606 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 24 Jun 2019 18:16:12 -0700 Subject: [PATCH] [managed-ledger] close bk-client factory gracefully (#4580) ### Motivation User can create tools on bookkeeper using ManagedLedger factory which provides [constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121) to create ml-factory using self-managed bookkeeper (it's not used by broker). So, in case of self-managed bk-client, ML-Factory couldn't shutdown it gracefully and we see issue: #4573 ### Modification - ML-Factory creates `DefaultBkFactory` to create self-managed bk-client and shutdowns same bk-client while closing the resource. --- .../impl/ManagedLedgerFactoryImpl.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 95ee6b40fb1..39ff102fb21 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -27,6 +27,7 @@ import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration, - ManagedLedgerFactoryConfig config) - throws Exception { - this((policyConfig) -> { - try { - return new BookKeeper(bkClientConfiguration, zkc); - } catch (Exception e) { - throw new IllegalStateException(e); - } - }, true /* isBookkeeperManaged */, zkc, config); + ManagedLedgerFactoryConfig config) throws Exception { + this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config); } public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception { @@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { cacheEvictionExecutor.execute(this::cacheEvictionTask); } + static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { + + private final BookKeeper bkClient; + + public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc) + throws BKException, IOException, InterruptedException { + bkClient = new BookKeeper(bkClientConfiguration, zkc); + } + + @Override + public BookKeeper get(EnsemblePlacementPolicyConfig policy) { + return bkClient; + } + } + private synchronized void refreshStats() { long now = System.nanoTime(); long period = now - lastStatTimestamp; @@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { if (isBookkeeperManaged) { try { - BookKeeper bkFactory = bookkeeperFactory.get(); - if (bkFactory != null) { - bkFactory.close(); + BookKeeper bookkeeper = bookkeeperFactory.get(); + if (bookkeeper != null) { + bookkeeper.close(); } } catch (BKException e) { throw new ManagedLedgerException(e); -- GitLab