提交 c8b25739 编写于 作者: R Rajan Dhabalia 提交者: Penghui Li

[managed-ledger] close bk-client factory gracefully (#4580)

### Motivation
User can create tools on bookkeeper using ManagedLedger factory which provides [constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121) to create ml-factory using self-managed bookkeeper (it's not used by broker).
So, in case of self-managed bk-client, ML-Factory couldn't shutdown it gracefully and we see issue: #4573

### Modification
- ML-Factory creates `DefaultBkFactory` to create self-managed bk-client and shutdowns same bk-client while closing the resource. 
上级 d3328156
......@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
this((policyConfig) -> {
try {
return new BookKeeper(bkClientConfiguration, zkc);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}, true /* isBookkeeperManaged */, zkc, config);
ManagedLedgerFactoryConfig config) throws Exception {
this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
......@@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
cacheEvictionExecutor.execute(this::cacheEvictionTask);
}
static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
private final BookKeeper bkClient;
public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc)
throws BKException, IOException, InterruptedException {
bkClient = new BookKeeper(bkClientConfiguration, zkc);
}
@Override
public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
return bkClient;
}
}
private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
......@@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
if (isBookkeeperManaged) {
try {
BookKeeper bkFactory = bookkeeperFactory.get();
if (bkFactory != null) {
bkFactory.close();
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册