提交 fc109317 编写于 作者: B bobbeyreese 提交者: Rajan

Fix bug causing removed brokers to be considered as candidates (#343)

* Make candidate selection only consider active brokers
上级 8bc42352
......@@ -161,7 +161,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;
private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);
......@@ -347,7 +347,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
try {
Set<String> activeBrokers = availableActiveBrokers.get();
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
for (String broker : activeBrokers) {
for (final String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
final LocalBrokerData localData = brokerDataCache.get(key)
......@@ -357,16 +357,22 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Replace previous local broker data.
brokerDataMap.get(broker).setLocalData(localData);
} else {
// Initialize BrokerData object for previously unseen
// brokers.
// Initialize BrokerData object for previously unseen brokers.
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
}
}
// Remove obsolete brokers.
for (final String broker : brokerDataMap.keySet()) {
if (!activeBrokers.contains(broker)) {
brokerDataMap.remove(broker);
}
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]", e.getMessage());
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]",
e.getMessage());
}
}
......@@ -485,27 +491,42 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
* @return The name of the selected broker, as it appears on ZooKeeper.
*/
@Override
public synchronized String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, loadData.getBrokerData().keySet());
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Use the filter pipeline to finalize broker candidates.
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
final String broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// Use brokerCandidateCache as a lock to reduce synchronization.
synchronized(brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
Set<String> activeBrokers;
try {
activeBrokers = availableActiveBrokers.get();
} catch (Exception e) {
// Try-catch block inserted because ZooKeeperChildrenCache.get throws checked exception, though we
// should not really see this happen unless something goes very wrong.
log.warn("Unexpected error when trying to get active brokers", e);
// Fall back to using loadData key set.
activeBrokers = loadData.getBrokerData().keySet();
}
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, activeBrokers);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Add new bundle to preallocated.
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);
return broker;
// Use the filter pipeline to finalize broker candidates.
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
final String broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
// Add new bundle to preallocated.
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);
return broker;
}
}
/**
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
public class ModularLoadManagerImplTest {
private LocalBookkeeperEnsemble bkEnsemble;
private URL url1;
private PulsarService pulsar1;
private PulsarAdmin admin1;
private URL url2;
private PulsarService pulsar2;
private PulsarAdmin admin2;
private String primaryHost;
private String secondaryHost;
private NamespaceBundleFactory nsFactory;
private ModularLoadManagerImpl primaryLoadManager;
private ModularLoadManagerImpl secondaryLoadManager;
private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort();
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImplTest.class);
static {
System.setProperty("test.basePort", "16100");
}
// Invoke non-overloaded method.
private Object invokeSimpleMethod(final Object instance, final String methodName, final Object... args)
throws Exception {
for (Method method : instance.getClass().getDeclaredMethods()) {
if (method.getName().equals(methodName)) {
method.setAccessible(true);
return method.invoke(instance, args);
}
}
throw new IllegalArgumentException("Method not found: " + methodName);
}
private static Object getField(final Object instance, final String fieldName) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(instance);
}
private static void setField(final Object instance, final String fieldName, final Object value) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(instance, value);
}
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
pulsar1 = new PulsarService(config1);
pulsar1.start();
primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
admin1 = new PulsarAdmin(url1, (Authentication) null);
// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
pulsar2 = new PulsarService(config2);
secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
SECONDARY_BROKER_WEBSERVICE_PORT);
pulsar2.start();
url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
admin2 = new PulsarAdmin(url2, (Authentication) null);
primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
Thread.sleep(100);
}
@AfterMethod
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
admin1.close();
admin2.close();
pulsar2.close();
pulsar1.close();
bkEnsemble.stop();
}
private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
return nsFactory.getBundle(new NamespaceName(property, cluster, namespace),
Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
BoundType.CLOSED));
}
private NamespaceBundle makeBundle(final String all) {
return makeBundle(all, all, all);
}
@Test
public void testCandidateConsistency() throws Exception {
boolean foundFirst = false;
boolean foundSecond = false;
// After 2 selections, the load balancer should select both brokers due to preallocation.
for (int i = 0; i < 2; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
if (broker.equals(primaryHost)) {
foundFirst = true;
} else {
foundSecond = true;
}
}
assert (foundFirst && foundSecond);
// Now disable the secondary broker.
secondaryLoadManager.disableBroker();
LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
// Give some time for the watch to fire.
Thread.sleep(500);
// Make sure the second broker is not in the internal map.
assert (!loadData.getBrokerData().containsKey(secondaryHost));
// Try 5 more selections, ensure they all go to the first broker.
for (int i = 2; i < 7; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
assert (primaryLoadManager.selectBrokerForAssignment(serviceUnit).equals(primaryHost));
}
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance;
import java.util.Map;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册