From 70b2a3523d9c7a5fa97f90767a2c1cbe989ec635 Mon Sep 17 00:00:00 2001 From: Brad McMillen Date: Fri, 16 Jun 2017 12:27:44 -0700 Subject: [PATCH] Remove bundle from the ephemeral preallocated map once it is loaded (#481) --- .../impl/ModularLoadManagerImpl.java | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index ac97797abdb..084c8994db6 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -182,6 +182,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach if (log.isDebugEnabled()) { log.debug("Update Received for path {}", path); } + reapDeadBrokerPreallocations(data); scheduler.submit(ModularLoadManagerImpl.this::updateAll); } }); @@ -244,6 +245,30 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach } } + // For each broker that we have a recent load report, see if they are still alive + private void reapDeadBrokerPreallocations(Set aliveBrokers) { + for ( String broker : loadData.getBrokerData().keySet() ) { + if ( !aliveBrokers.contains(broker)) { + if ( log.isDebugEnabled() ) { + log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", broker); + } + final Iterator> iterator = preallocatedBundleToBroker.entrySet().iterator(); + while ( iterator.hasNext() ) { + Map.Entry entry = iterator.next(); + final String preallocatedBundle = entry.getKey(); + final String preallocatedBroker = entry.getValue(); + if ( broker.equals(preallocatedBroker) ) { + if ( log.isDebugEnabled() ) { + log.debug("Removing old preallocation on dead broker {} for bundle {}", + preallocatedBroker, preallocatedBundle); + } + iterator.remove(); + } + } + } + } + } + private Set getAvailableBrokers() { try { return availableActiveBrokers.get(); @@ -414,15 +439,24 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach // Remove all loaded bundles from the preallocated maps. final Map preallocatedBundleData = brokerData.getPreallocatedBundleData(); - // Should not iterate with more than one thread at a time. synchronized (preallocatedBundleData) { - final Iterator> preallocatedIterator = preallocatedBundleData.entrySet() - .iterator(); - while (preallocatedIterator.hasNext()) { - final String bundle = preallocatedIterator.next().getKey(); - if (bundleData.containsKey(bundle)) { - preallocatedIterator.remove(); - preallocatedBundleToBroker.remove(bundle); + for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) { + if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) { + final Iterator> preallocatedIterator = preallocatedBundleData.entrySet() + .iterator(); + while (preallocatedIterator.hasNext()) { + final String bundle = preallocatedIterator.next().getKey(); + + if (bundleData.containsKey(bundle)) { + preallocatedIterator.remove(); + preallocatedBundleToBroker.remove(bundle); + } + } + } + + // This is needed too in case a broker which was assigned a bundle dies and comes back up. + if ( preallocatedBundleToBroker.containsKey(preallocatedBundleName) ) { + preallocatedBundleToBroker.remove(preallocatedBundleName); } } } -- GitLab