提交 de2c7ee8 编写于 作者: M Matteo Merli 提交者: rdhabalia

Refactoring of DiscoveryService

上级 d93b233a
......@@ -18,7 +18,8 @@ package com.yahoo.pulsar.discovery.service;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Client;
......@@ -38,7 +39,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
......@@ -64,22 +64,22 @@ public class DiscoveryServiceTest extends ProducerConsumerBase {
/**
* 1. Start : Broker and Discovery service. 2. Provide started broker server as active broker to Discovery service
* 3. Call GET, PUT, POST request to discovery service that redirects to Broker service and receives response
*
*
* @throws Exception
*/
@Test
public void testRiderectUrlWithServerStarted() throws Exception {
// 1. start server
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
System.setProperty("zooKeeperSessionTimeoutMillis", "1000");
int port = PortManager.nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
server.start(resources);
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zooKeeperSessionTimeoutMillis", "1000");
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
ZookeeperCacheLoader.availableActiveBrokers.add(super.brokerUrl.getHost() + ":" + super.brokerUrl.getPort());
......@@ -122,7 +122,8 @@ public class DiscoveryServiceTest extends ProducerConsumerBase {
fail();
}
JSONObject jsonObject = new JSONObject(response.readEntity(String.class));
String s = response.readEntity(String.class);
JSONObject jsonObject = new JSONObject();
String serviceResponse = jsonObject.getString("reason");
return serviceResponse;
}
......
......@@ -32,7 +32,7 @@ public class ZookeeperCacheLoader {
// dummy constructor
}
public List<String> getAvailableActiveBrokers() {
public List<String> getAvailableBrokers() {
return this.availableActiveBrokers;
}
......
......@@ -141,6 +141,14 @@
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-zookeeper-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
......
......@@ -15,33 +15,25 @@
*/
package com.yahoo.pulsar.discovery.service;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.System.getProperty;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.collections.CollectionUtils;
import org.glassfish.jersey.server.ContainerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.wordnik.swagger.annotations.ApiOperation;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
/**
* Acts a load-balancer that receives any incoming request and discover active-available broker in round-robin manner
......@@ -51,95 +43,117 @@ import com.wordnik.swagger.annotations.ApiOperation;
* </p>
*
*/
@Singleton
@Path("{apiPath:.*}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.APPLICATION_OCTET_STREAM,
MediaType.MULTIPART_FORM_DATA })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.APPLICATION_OCTET_STREAM,
MediaType.MULTIPART_FORM_DATA })
public class DiscoveryService {
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
private static final AtomicInteger count = new AtomicInteger();
private static final ZookeeperCacheLoader zkCache = getZookeeperCacheLoader();
@POST
@ApiOperation(value = "Redirect POST request to broker")
public Response post(@Context ContainerRequest headers) {
return redirect(headers);
public class DiscoveryServiceServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private final AtomicInteger counter = new AtomicInteger();
private ZookeeperCacheLoader zkCache;
@Override
public void init(ServletConfig config) throws ServletException {
log.info("Initializing DiscoveryServiceServlet resource");
String zookeeperServers = config.getInitParameter("zookeeperServers");
String zookeeperClientFactoryClassName = config.getInitParameter("zookeeperClientFactoryClass");
if (zookeeperClientFactoryClassName == null) {
zookeeperClientFactoryClassName = ZookeeperClientFactoryImpl.class.getName();
}
log.info("zookeeperServers={} zookeeperClientFactoryClass={}", zookeeperServers,
zookeeperClientFactoryClassName);
try {
ZooKeeperClientFactory zkClientFactory = (ZooKeeperClientFactory) Class
.forName(zookeeperClientFactoryClassName).newInstance();
zkCache = new ZookeeperCacheLoader(zkClientFactory, zookeeperServers);
} catch (Throwable t) {
throw new ServletException(t);
}
}
@GET
@ApiOperation(value = "Redirect GET request to broker")
public Response get(@Context ContainerRequest headers) throws IOException {
return redirect(headers);
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@PUT
@ApiOperation(value = "Redirect PUT request to broker")
public Response put(@Context ContainerRequest headers) {
return redirect(headers);
@Override
protected void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
/**
* redirect request to given direct http uri path
*
* @param path
* @return
*/
protected Response redirect(ContainerRequest headers) {
checkNotNull(headers);
String scheme = headers.getBaseUri().getScheme();
String path = headers.getPath(false);
URI location;
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doOptions(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
private void redirect(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
try {
String url = (new StringBuilder(scheme)).append("://").append(nextBroker()).append("/").append(path)
.toString();
location = new URI(url);
LoadReport broker = nextBroker();
URI brokerURI;
if (request.getScheme().equals("http")) {
// Use normal HTTP url
brokerURI = new URI(broker.getWebServiceUrl());
} else {
brokerURI = new URI(broker.getWebServiceUrlTls());
}
StringBuilder location = new StringBuilder();
location.append(brokerURI.getScheme()).append("://").append(brokerURI.getHost()).append(':')
.append(brokerURI.getPort()).append(request.getRequestURI());
if (request.getQueryString() != null) {
location.append('?').append(request.getQueryString());
}
log.info("Redirecting to {}", location);
response.sendRedirect(location.toString());
} catch (URISyntaxException e) {
log.warn("No broker found in zookeeper {}", e.getMessage(), e);
throw new RestException(Status.SERVICE_UNAVAILABLE, "Broker is not available");
}
return Response.temporaryRedirect(location).build();
}
/**
* Find next broke url in round-robin
*
*
* @return
*/
public String nextBroker() {
if (!CollectionUtils.isEmpty(availableActiveBrokers())) {
int next = count.getAndIncrement() % availableActiveBrokers().size();
return availableActiveBrokers().get(next);
} else {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
}
}
public List<String> availableActiveBrokers() {
return zkCache.getAvailableActiveBrokers();
}
LoadReport nextBroker() {
List<LoadReport> availableBrokers = zkCache.getAvailableBrokers();
/**
* initialize {@link ZookeeperCacheLoader} instance by creating ZooKeeper connection and broker connection
*
* @return {@link ZookeeperCacheLoader} to fetch available broker list
*/
private static ZookeeperCacheLoader getZookeeperCacheLoader() {
String zookeeperServers = checkNotNull(getProperty("zookeeperServers"), "zookeeperServers property not set");
ZookeeperCacheLoader zkCacheLoader;
try {
zkCacheLoader = new ZookeeperCacheLoader(zookeeperServers);
} catch (InterruptedException e) {
log.warn("Failed to fetch broker list from ZooKeeper, {}", e.getMessage(), e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (IOException e) {
log.warn("Failed to create ZooKeeper session, {}", e.getMessage(), e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
if (availableBrokers.isEmpty()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
} else {
int brokersCount = availableBrokers.size();
int nextIdx = Math.abs(counter.getAndIncrement()) % brokersCount;
return availableBrokers.get(nextIdx);
}
return zkCacheLoader;
}
private static final Logger log = LoggerFactory.getLogger(DiscoveryService.class);
private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceServlet.class);
}
......@@ -15,132 +15,108 @@
*/
package com.yahoo.pulsar.discovery.service;
import java.io.IOException;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperConnectionService;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker list.
*
*
*/
public class ZookeeperCacheLoader {
// Zookeeper quorum connection string
private String zookeeperServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
private ZooKeeperCache localZkCache;
private LocalZooKeeperConnectionService localZkConnectionSvc;
private ZooKeeperClientFactory zkClientFactory = null;
private ZooKeeperChildrenCache availableActiveBrokerCache;
private volatile List<String> availableActiveBrokers;
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
public class ZookeeperCacheLoader implements Closeable {
private final ZooKeeperCache localZkCache;
private final LocalZooKeeperConnectionService localZkConnectionSvc;
private final ZooKeeperDataCache<LoadReport> brokerInfo;
private final ZooKeeperChildrenCache availableBrokersCache;
private volatile List<LoadReport> availableBrokers;
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery");
static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
private static final int zooKeeperSessionTimeoutMillis = 30_000;
/**
* Initialize ZooKeeper session and creates broker cache list
*
*
* @param zookeeperServers
* @throws InterruptedException
* : when failed to fetch broker list from cache
* @throws IOException
* : when failed create ZooKeeper session
* @throws Exception
*/
public ZookeeperCacheLoader(String zookeeperServers) throws InterruptedException, IOException {
this.zookeeperServers = zookeeperServers;
start();
}
/**
* starts ZooKeeper session
*
* @throws InterruptedException
* : when failed to fetch broker list from cache
* @throws IOException
* : when failed create zk session
*/
public void start() throws InterruptedException, IOException {
localZkConnectionSvc = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), zookeeperServers,
public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zookeeperServers) throws Exception {
localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
zooKeeperSessionTimeoutMillis);
localZkConnectionSvc.start(new ShutdownService() {
@Override
public void shutdown(int exitCode) {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
}
}
localZkConnectionSvc.start(exitCode -> {
log.error("Shutting down ZK sessions: {}", exitCode);
});
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
// attach ZooKeeperChildrenCache listener
initializeBrokerList();
}
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
}
});
/**
* 1. creates ZooKeeper Children cache on path {@value LOADBALANCE_BROKERS_ROOT}, 2. sets watch on the path and 3.
* maintain list of available brokers at availableActiveBrokers
*
* @throws InterruptedException
* @throws IOException
*
*/
private void initializeBrokerList() throws InterruptedException, IOException {
this.availableActiveBrokerCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableActiveBrokerCache.registerListener(new ZooKeeperCacheListener<Set<String>>() {
this.brokerInfo = new ZooKeeperDataCache<LoadReport>(localZkCache) {
@Override
public void onUpdate(String path, Set<String> data, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("Update Received for path {}", path);
}
availableActiveBrokers = Lists.newArrayList(data);
public LoadReport deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, LoadReport.class);
}
};
this.availableBrokersCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableBrokersCache.registerListener((path, brokerNodes, stat) -> {
try {
updateBrokerList(brokerNodes);
} catch (Exception e) {
log.warn("Error updating broker info after broker list changed.", e);
}
});
// initialize available broker list
try {
this.availableActiveBrokers = Lists.newArrayList(availableActiveBrokerCache.get());
} catch (KeeperException e) {
log.warn("Failed to find broker znode children under {}", LOADBALANCE_BROKERS_ROOT, e);
throw new IOException(String.format("Failed to find broker list in zk at %s with %s ",
LOADBALANCE_BROKERS_ROOT, e.getMessage()), e);
}
}
private ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
// Do initial fetch of brokers list
updateBrokerList(availableBrokersCache.get());
}
public List<String> getAvailableActiveBrokers() {
return this.availableActiveBrokers;
public List<LoadReport> getAvailableBrokers() {
return availableBrokers;
}
public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
@Override
public void close() {
orderedExecutor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker));
}
this.availableBrokers = availableBrokers;
}
private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheLoader.class);
}
......@@ -26,19 +26,18 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.DiscoveryServiceServlet;
/**
*
* Starts jetty server and initialize {@link DiscoveryService} web-service
*
* Starts jetty server and initialize {@link DiscoveryServiceServlet} web-service
*
*/
public class DiscoveryServiceStarter {
......@@ -65,17 +64,20 @@ public class DiscoveryServiceStarter {
}
}
});
// add servlet
final List<String> servletPackages = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", config.getZookeeperServers());
Map<String, String> initParameters = new TreeMap<>();
initParameters.put("zookeeperServers", config.getZookeeperServers());
server.addServlet("/*", DiscoveryServiceServlet.class, initParameters);
// start web-service
server.start(servletPackages);
server.start();
log.info("Discovery service is started at {}", server.getServiceUri().toString());
}
public static void main(String[] args) {
checkArgument(args.length == 1, "Need to specify a configuration file");
try {
// load config file and start server
init(args[0]);
......
......@@ -15,17 +15,16 @@
*/
package com.yahoo.pulsar.discovery.service.server;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.net.ssl.SSLContext;
import javax.servlet.Servlet;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
......@@ -38,13 +37,11 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.RestException;
import io.netty.util.concurrent.DefaultThreadFactory;
......@@ -79,7 +76,8 @@ public class ServerManager {
} catch (GeneralSecurityException e) {
throw new RestException(e);
}
sslCtxFactory.setWantClientAuth(true);
sslCtxFactory.setWantClientAuth(false);
ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
connectors.add(tlsConnector);
......@@ -94,16 +92,13 @@ public class ServerManager {
return this.server.getURI();
}
public void addRestResources(String basePath, String javaPackages) {
ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
servletHolder.setInitParameter("jersey.config.server.provider.packages", javaPackages);
addServlet(basePath, servletHolder);
}
public void addServlet(String path, ServletHolder servletHolder) {
public void addServlet(String path, Class<? extends Servlet> servlet, Map<String, String> initParameters) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, "/*");
ServletHolder holder = new ServletHolder(servlet);
holder.setInitParameters(initParameters);
context.addServlet(holder, path);
handlers.add(context);
}
......@@ -111,29 +106,26 @@ public class ServerManager {
return externalServicePort;
}
protected void start() throws Exception {
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
public void start() throws Exception {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
server.setHandler(handlerCollection);
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
server.setHandler(handlerCollection);
server.start();
server.start();
} catch (Exception e) {
throw new Exception(e);
}
log.info("Server started at end point {}", getServiceUri());
}
public void stop() throws Exception {
......@@ -142,13 +134,5 @@ public class ServerManager {
log.info("Server stopped successfully");
}
public void start(List<String> resources) throws Exception {
if (resources != null) {
resources.forEach(r -> this.addRestResources("/", DiscoveryService.class.getPackage().getName()));
}
this.start();
log.info("Server started at end point {}", getServiceUri().toString());
}
private static final Logger log = LoggerFactory.getLogger(ServerManager.class);
}
......@@ -15,10 +15,10 @@
*/
package com.yahoo.pulsar.discovery.service.server;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.DiscoveryServiceServlet;
/**
* Service Configuration to start :{@link DiscoveryService}
* Service Configuration to start :{@link DiscoveryServiceServlet}
*
*/
public class ServiceConfig {
......
......@@ -15,7 +15,7 @@
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import static com.yahoo.pulsar.discovery.service.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
......@@ -30,19 +30,14 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.List;
import java.util.stream.Collectors;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.ws.rs.HttpMethod;
......@@ -64,7 +59,6 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.discovery.service.server.DiscoveryServiceStarter;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
......@@ -75,7 +69,7 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
* 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and
* POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception
* with redirected broker
*
*
*/
public class DiscoveryServiceTest {
......@@ -89,8 +83,8 @@ public class DiscoveryServiceTest {
List<String> brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000");
System.setProperty("zookeeperServers", "dummy-value");
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
DiscoveryServiceServlet discovery = new DiscoveryServiceServlet();
Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
......@@ -120,17 +114,17 @@ public class DiscoveryServiceTest {
public void testRiderectUrlWithServerStarted() throws Exception {
// 1. start server
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
List<String> resources = Lists.newArrayList(DiscoveryServiceServlet.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
int port = nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
server.start(resources);
server.start();
// 2. get ZookeeperCacheLoader to add more brokers
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
DiscoveryServiceServlet discovery = new DiscoveryServiceServlet();
Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
......@@ -195,9 +189,6 @@ public class DiscoveryServiceTest {
public void testTlsEnable() throws Exception {
// 1. start server with tls enable
final boolean allowInsecure = false;
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
int port = nextFreePort();
int tlsPort = nextFreePort();
ServiceConfig config = new ServiceConfig();
......@@ -207,11 +198,11 @@ public class DiscoveryServiceTest {
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
ServerManager server = new ServerManager(config);
server.start(resources);
server.start();
// 2. get ZookeeperCacheLoader to add more brokers
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
DiscoveryServiceServlet discovery = new DiscoveryServiceServlet();
Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
final String redirect_broker_host = "broker-1";
......
......@@ -15,16 +15,14 @@
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import static com.yahoo.pulsar.discovery.service.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
......@@ -32,32 +30,33 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.beust.jcommander.internal.Lists;
import com.yahoo.pulsar.discovery.service.ZookeeperCacheLoader;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
public class ZookeeperCacheLoaderTest {
private MockZooKeeper mockZookKeeper;
private ZooKeeperClientFactory mockZookKeeperFactory;
@BeforeMethod
void setup() throws Exception {
mockZookKeeper = MockZooKeeper.newInstance();
mockZookKeeperFactory = new MockedZooKeeperClientFactoryImpl();
}
@AfterMethod
void teardown() throws Exception {
mockZookKeeper.shutdown();
}
/**
* Create znode for available broker in ZooKeeper and updates it again to verify ZooKeeper cache update
*
*
* @throws InterruptedException
* @throws KeeperException
* @throws IOException
*/
@Test
public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, IOException {
ZookeeperCacheLoader zkLoader = new ZookeeperCacheLoader(null);
public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, Exception {
ZookeeperCacheLoader zkLoader = new ZookeeperCacheLoader(mockZookKeeperFactory, "");
List<String> brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000");
// 1. create znode for each broker
......@@ -73,7 +72,7 @@ public class ZookeeperCacheLoaderTest {
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 2. get available brokers from ZookeeperCacheLoader
List<String> list = zkLoader.getAvailableActiveBrokers();
List<LoadReport> list = zkLoader.getAvailableBrokers();
// 3. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
......@@ -86,7 +85,7 @@ public class ZookeeperCacheLoaderTest {
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 4.b. get available brokers from ZookeeperCacheLoader
list = zkLoader.getAvailableActiveBrokers();
list = zkLoader.getAvailableBrokers();
// 4.c. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.zookeeper;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import java.io.IOException;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.yahoo.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
/**
*
* Test Mock LocalZooKeeperConnectionService
*
*/
public class LocalZooKeeperConnectionService {
private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperConnectionService.class);
private final ZooKeeperClientFactory zkClientFactory;
private final String zkConnect;
private final long zkSessionTimeoutMillis;
private ZooKeeper localZooKeeper;
private ZooKeeperSessionWatcher localZooKeeperSessionWatcher;
public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, String zkConnect,
long zkSessionTimeoutMillis) {
this.zkClientFactory = zkClientFactory;
this.zkConnect = zkConnect;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
}
public void start(ShutdownService shutdownService) throws IOException {
// mock zk
try {
localZooKeeper = MockZooKeeper.newInstance();
ZkUtils.createFullPathOptimistic(localZooKeeper, LOADBALANCE_BROKERS_ROOT, "test".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis,
shutdownService);
localZooKeeperSessionWatcher.start();
localZooKeeper.register(localZooKeeperSessionWatcher);
} catch (Exception e) {
throw new IOException("Failed to establish session with local ZK", e);
}
}
public void close() throws IOException {
if (localZooKeeper != null) {
try {
localZooKeeper.close();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
if (localZooKeeperSessionWatcher != null) {
localZooKeeperSessionWatcher.close();
}
}
public ZooKeeper getLocalZooKeeper() {
return this.localZooKeeper;
}
/**
* Check if a persist node exists. If not, it attempts to create the znode.
*
* @param path
* znode path
* @throws KeeperException
* zookeeper exception.
* @throws InterruptedException
* zookeeper exception.
*/
public static void checkAndCreatePersistNode(ZooKeeper zkc, String path)
throws KeeperException, InterruptedException {
// check if the node exists
if (zkc.exists(path, false) == null) {
/**
* create znode
*/
try {
// do create the node
zkc.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("created znode, path={}", path);
} catch (Exception e) {
LOG.warn("create znode failed, path={} : {}", path, e.getMessage(), e);
}
}
}
public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data, createMode, false);
}
public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode, boolean gc)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data.getBytes(Charsets.UTF_8), createMode, gc);
}
public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data, createMode, false);
}
public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode, boolean gc)
throws KeeperException, InterruptedException {
String pathCreated = null;
try {
pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
} catch (NodeExistsException e) {
// OK
LOG.debug("Create skipped for existing znode: path={}", path);
}
// reset if what exists is the ephemeral garbage.
if (gc && (pathCreated == null) && CreateMode.EPHEMERAL.equals(createMode)) {
Stat stat = zk.exists(path, false);
if (stat != null && zk.getSessionId() != stat.getEphemeralOwner()) {
deleteIfExists(zk, path, -1);
pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
}
}
return pathCreated;
}
public static void deleteIfExists(ZooKeeper zk, String path, int version)
throws KeeperException, InterruptedException {
try {
zk.delete(path, version);
} catch (NoNodeException e) {
// OK
LOG.debug("Delete skipped for non-existing znode: path={}", path);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册