提交 70b2a352 编写于 作者: B Brad McMillen 提交者: Matteo Merli

Remove bundle from the ephemeral preallocated map once it is loaded (#481)

上级 c0f8b0d4
......@@ -182,6 +182,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
if (log.isDebugEnabled()) {
log.debug("Update Received for path {}", path);
}
reapDeadBrokerPreallocations(data);
scheduler.submit(ModularLoadManagerImpl.this::updateAll);
}
});
......@@ -244,6 +245,30 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
// For each broker that we have a recent load report, see if they are still alive
private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) {
for ( String broker : loadData.getBrokerData().keySet() ) {
if ( !aliveBrokers.contains(broker)) {
if ( log.isDebugEnabled() ) {
log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", broker);
}
final Iterator<Map.Entry<String, String>> iterator = preallocatedBundleToBroker.entrySet().iterator();
while ( iterator.hasNext() ) {
Map.Entry<String, String> entry = iterator.next();
final String preallocatedBundle = entry.getKey();
final String preallocatedBroker = entry.getValue();
if ( broker.equals(preallocatedBroker) ) {
if ( log.isDebugEnabled() ) {
log.debug("Removing old preallocation on dead broker {} for bundle {}",
preallocatedBroker, preallocatedBundle);
}
iterator.remove();
}
}
}
}
}
private Set<String> getAvailableBrokers() {
try {
return availableActiveBrokers.get();
......@@ -414,15 +439,24 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
// Should not iterate with more than one thread at a time.
synchronized (preallocatedBundleData) {
final Iterator<Map.Entry<String, BundleData>> preallocatedIterator = preallocatedBundleData.entrySet()
.iterator();
while (preallocatedIterator.hasNext()) {
final String bundle = preallocatedIterator.next().getKey();
if (bundleData.containsKey(bundle)) {
preallocatedIterator.remove();
preallocatedBundleToBroker.remove(bundle);
for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) {
if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) {
final Iterator<Map.Entry<String, BundleData>> preallocatedIterator = preallocatedBundleData.entrySet()
.iterator();
while (preallocatedIterator.hasNext()) {
final String bundle = preallocatedIterator.next().getKey();
if (bundleData.containsKey(bundle)) {
preallocatedIterator.remove();
preallocatedBundleToBroker.remove(bundle);
}
}
}
// This is needed too in case a broker which was assigned a bundle dies and comes back up.
if ( preallocatedBundleToBroker.containsKey(preallocatedBundleName) ) {
preallocatedBundleToBroker.remove(preallocatedBundleName);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册