提交 911a9b67 编写于 作者: R Rajan 提交者: GitHub

deserialize load report based on load-manager (#338)

上级 4866563d
......@@ -28,6 +28,8 @@ import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
......@@ -57,6 +59,13 @@ public interface LoadManager {
* Generate the load report
*/
LoadReport generateLoadReport() throws Exception;
/**
* Returns {@link Deserializer} to deserialize load report
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
/**
* Set flag to force load report update
......
......@@ -18,6 +18,8 @@ package com.yahoo.pulsar.broker.loadbalance;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* New proposal for a load manager interface which attempts to use more intuitive method names and provide a starting
......@@ -88,4 +90,11 @@ public interface ModularLoadManager {
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
void writeBundleDataOnZooKeeper();
/**
* Return :{@link Deserializer} to deserialize load-manager load report
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
}
......@@ -15,6 +15,8 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
......@@ -65,6 +67,7 @@ import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
......@@ -158,6 +161,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;
private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);
/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
......@@ -579,6 +585,11 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
@Override
public Deserializer<LocalBrokerData> getLoadReportDeserializer() {
return loadReportDeserializer;
}
/**
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
......
......@@ -26,6 +26,8 @@ import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
......@@ -103,4 +105,9 @@ public class ModularLoadManagerWrapper implements LoadManager {
public void writeResourceQuotasToZooKeeper() {
loadManager.writeBundleDataOnZooKeeper();
}
@Override
public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
return loadManager.getLoadReportDeserializer();
}
}
......@@ -73,9 +73,11 @@ import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {
......@@ -174,6 +176,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private long lastResourceUsageTimestamp = -1;
// flag to force update load report
private boolean forceLoadReportUpdate = false;
private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LoadReport.class);
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
......@@ -313,6 +317,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
}
@Override
public Deserializer<LoadReport> getLoadReportDeserializer() {
return loadReportDeserializer;
}
public ZooKeeperChildrenCache getActiveBrokersCache() {
return this.availableActiveBrokers;
}
......
......@@ -150,10 +150,6 @@ public class NamespaceService {
return bundleFactory.getFullBundle(fqnn);
}
private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) ->
jsonMapper().readValue(content, ServiceLookupData.class);
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
......@@ -399,7 +395,7 @@ public class NamespaceService {
}
}
private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
......@@ -407,7 +403,7 @@ public class NamespaceService {
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
pulsar.getLocalZkCache().getDataAsync(path, serviceLookupDataDeserializer).thenAccept(reportData -> {
pulsar.getLocalZkCache().getDataAsync(path, pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
if (reportData.isPresent()) {
ServiceLookupData lookupData = reportData.get();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
......
......@@ -31,14 +31,19 @@ import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -49,9 +54,18 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
......@@ -62,9 +76,13 @@ import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
public class NamespaceServiceTest extends BrokerTestBase {
......@@ -279,7 +297,43 @@ public class NamespaceServiceTest extends BrokerTestBase {
// ok
}
}
/**
* <pre>
* It verifies that namespace service deserialize the load-report based on load-manager which active.
* 1. write candidate1- load report using {@link LoadReport} which is used by SimpleLoadManagerImpl
* 2. Write candidate2- load report using {@link LocalBrokerData} which is used by ModularLoadManagerImpl
* 3. try to get Lookup Result based on active load-manager
* </pre>
* @throws Exception
*/
@Test
public void testLoadReportDeserialize() throws Exception {
final String candidateBroker1 = "http://localhost:8000";
final String candidateBroker2 = "http://localhost:3000";
LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
URI uri1 = new URI(candidateBroker1);
URI uri2 = new URI(candidateBroker2);
String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();
// update to new load mananger
pulsar.getLoadManager().set(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2).get();
Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
System.out.println(result2);
}
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册