From 07d2daf4871da7b280b60d7fc71188e6cb8a4858 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 18 Jun 2017 07:41:35 -0700 Subject: [PATCH] Shutdown load manager executor on pulsar service close (#489) --- .../com/yahoo/pulsar/broker/PulsarService.java | 16 ++++++++++------ .../loadbalance/impl/ModularLoadManagerImpl.java | 16 +++++++++------- .../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +++- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index e4c5f6b5790..cb5284143c5 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -94,7 +94,7 @@ public class PulsarService implements AutoCloseable { private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("zk-cache-callback")); private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered"); - private ScheduledExecutorService loadManagerExecutor = null; + private final ScheduledExecutorService loadManagerExecutor; private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; private ScheduledFuture loadResourceQuotaTask = null; @@ -133,7 +133,8 @@ public class PulsarService implements AutoCloseable { this.brokerVersion = PulsarBrokerVersionStringUtils.getNormalizedVersionString(); this.config = config; this.shutdownService = new MessagingServiceShutdownHook(this); - loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.loadManagerExecutor = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager")); } /** @@ -174,10 +175,7 @@ public class PulsarService implements AutoCloseable { this.leaderElectionService = null; } - if (loadManagerExecutor != null) { - loadManagerExecutor.shutdownNow(); - } - loadManager = null; + loadManagerExecutor.shutdown(); if (globalZkCache != null) { globalZkCache.close(); @@ -205,6 +203,12 @@ public class PulsarService implements AutoCloseable { orderedExecutor.shutdown(); cacheExecutor.shutdown(); + + LoadManager loadManager = this.loadManager.get(); + if (loadManager != null) { + loadManager.stop(); + } + state = State.Closed; } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 084c8994db6..653ce283914 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -60,6 +60,8 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener; import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache; import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; +import io.netty.util.concurrent.DefaultThreadFactory; + public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener { private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class); @@ -163,13 +165,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach loadSheddingPipeline = new ArrayList<>(); loadSheddingPipeline.add(new OverloadShedder(conf)); preallocatedBundleToBroker = new ConcurrentHashMap<>(); - scheduler = Executors.newScheduledThreadPool(1); + scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager")); } /** * Initialize this load manager using the given PulsarService. Should be called only once, after invoking the * default constructor. - * + * * @param pulsar * The service to initialize with. */ @@ -224,7 +226,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach /** * Initialize this load manager. - * + * * @param pulsar * Client to construct this manager from. */ @@ -475,7 +477,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach /** * As any broker, disable the broker this manager is running on. - * + * * @throws PulsarServerException * If ZooKeeper failed to disable the broker. */ @@ -548,7 +550,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach /** * As the leader broker, find a suitable broker for the assignment of the given bundle. - * + * * @param serviceUnit * ServiceUnitId for the bundle. * @return The name of the selected broker, as it appears on ZooKeeper. @@ -610,7 +612,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach /** * As any broker, start the load manager. - * + * * @throws PulsarServerException * If an unexpected error prevented the load manager from being started. */ @@ -647,7 +649,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach /** * As any broker, stop the load manager. - * + * * @throws PulsarServerException * If an unexpected error occurred when attempting to stop the load manager. */ diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 48fc6b842e1..83028e0ea75 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -74,6 +74,8 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener; import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache; import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; +import io.netty.util.concurrent.DefaultThreadFactory; + public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener { private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerImpl.class); @@ -179,7 +181,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene // Perform initializations which may be done without a PulsarService. public SimpleLoadManagerImpl() { - scheduler = Executors.newScheduledThreadPool(1); + scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-simple-load-manager")); this.sortedRankings.set(new TreeMap<>()); this.currentLoadReports = new HashMap<>(); this.resourceUnitRankings = new HashMap<>(); -- GitLab