提交 de86ce9d 编写于 作者: M Matteo Merli 提交者: GitHub

Merge pull request #27 from rdhabalia/discTest

Refactoring Discovery Service and Https support
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* This is a dummy class will be overridden over {@link ZookeeperCacheLoader} in discovery module in order to avoid
* ZooKeeper initialization
*
*/
public class ZookeeperCacheLoader {
public static final List<String> availableActiveBrokers = new ArrayList<String>();
public ZookeeperCacheLoader(String zookeeperServers) throws InterruptedException, IOException {
// dummy constructor
}
public List<String> getAvailableActiveBrokers() {
return this.availableActiveBrokers;
}
public void start() throws InterruptedException, IOException {
// dummy method
}
}
......@@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
package com.yahoo.pulsar.discovery.service.web;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Client;
......@@ -30,6 +32,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.test.PortManager;
import org.apache.zookeeper.ZooKeeper;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.filter.LoggingFilter;
import org.json.JSONException;
......@@ -43,8 +46,10 @@ import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.discovery.service.web.DiscoveryServiceServlet;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
public class DiscoveryServiceTest extends ProducerConsumerBase {
public class DiscoveryServiceWebTest extends ProducerConsumerBase {
private Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFilter.class));
......@@ -64,56 +69,52 @@ public class DiscoveryServiceTest extends ProducerConsumerBase {
/**
* 1. Start : Broker and Discovery service. 2. Provide started broker server as active broker to Discovery service
* 3. Call GET, PUT, POST request to discovery service that redirects to Broker service and receives response
*
*
* @throws Exception
*/
@Test
public void testRiderectUrlWithServerStarted() throws Exception {
// 1. start server
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
System.setProperty("zooKeeperSessionTimeoutMillis", "1000");
int port = PortManager.nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
server.start(resources);
ZookeeperCacheLoader.availableActiveBrokers.add(super.brokerUrl.getHost() + ":" + super.brokerUrl.getPort());
Thread.sleep(200);
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
String serviceUrl = server.getServiceUri().toString();
String putRequestUrl = serviceUrl + "admin/namespaces/p1/c1/n1";
String postRequestUrl = serviceUrl + "admin/namespaces/p1/c1/n1/permissions/test-role";
String postRequestUrl = serviceUrl + "admin/namespaces/p1/c1/n1/replication";
String getRequestUrl = serviceUrl + "admin/namespaces/p1";
/**
* verify : every time when vip receives a request: it redirects to above brokers sequentially and broker
* returns appropriate response which must not be null.
**/
assertNotNull(hitBrokerService(HttpMethod.POST, postRequestUrl, null));
assertNotNull(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)));
assertNotNull(hitBrokerService(HttpMethod.GET, getRequestUrl, null));
assertEquals("Cannot get the replication clusters for a non-global namespace", hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")));
assertEquals("Property does not exist", hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)));
assertEquals("Property does not exist", hitBrokerService(HttpMethod.GET, getRequestUrl, null));
server.stop();
}
public String hitBrokerService(String method, String url, BundlesData bundle) throws JSONException {
public String hitBrokerService(String method, String url, Object data) throws JSONException {
Response response = null;
try {
WebTarget webTarget = client.target(url);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
if (HttpMethod.PUT.equals(method)) {
response = (Response) invocationBuilder.put(Entity.entity(bundle, MediaType.APPLICATION_JSON));
response = (Response) invocationBuilder.put(Entity.entity(data, MediaType.APPLICATION_JSON));
} else if (HttpMethod.GET.equals(method)) {
response = (Response) invocationBuilder.get();
} else if (HttpMethod.POST.equals(method)) {
response = (Response) invocationBuilder.post(Entity.entity(bundle, MediaType.APPLICATION_JSON));
response = (Response) invocationBuilder.post(Entity.entity(data, MediaType.APPLICATION_JSON));
} else {
fail("Unsupported http method");
}
......@@ -126,5 +127,15 @@ public class DiscoveryServiceTest extends ProducerConsumerBase {
String serviceResponse = jsonObject.getString("reason");
return serviceResponse;
}
static class DiscoveryZooKeeperClientFactoryImpl implements ZooKeeperClientFactory {
static ZooKeeper zk;
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
return CompletableFuture.completedFuture(zk);
}
}
}
......@@ -141,6 +141,14 @@
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-zookeeper-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.System.getProperty;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.collections.CollectionUtils;
import org.glassfish.jersey.server.ContainerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.wordnik.swagger.annotations.ApiOperation;
/**
* Acts a load-balancer that receives any incoming request and discover active-available broker in round-robin manner
* and redirect request to that broker.
* <p>
* Accepts any {@value GET, PUT, POST} request and redirects to available broker-server to serve the request
* </p>
*
*/
@Singleton
@Path("{apiPath:.*}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.APPLICATION_OCTET_STREAM,
MediaType.MULTIPART_FORM_DATA })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.APPLICATION_OCTET_STREAM,
MediaType.MULTIPART_FORM_DATA })
public class DiscoveryService {
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
private static final AtomicInteger count = new AtomicInteger();
private static final ZookeeperCacheLoader zkCache = getZookeeperCacheLoader();
@POST
@ApiOperation(value = "Redirect POST request to broker")
public Response post(@Context ContainerRequest headers) {
return redirect(headers);
}
@GET
@ApiOperation(value = "Redirect GET request to broker")
public Response get(@Context ContainerRequest headers) throws IOException {
return redirect(headers);
}
@PUT
@ApiOperation(value = "Redirect PUT request to broker")
public Response put(@Context ContainerRequest headers) {
return redirect(headers);
}
/**
* redirect request to given direct http uri path
*
* @param path
* @return
*/
protected Response redirect(ContainerRequest headers) {
checkNotNull(headers);
String scheme = headers.getBaseUri().getScheme();
String path = headers.getPath(false);
URI location;
try {
String url = (new StringBuilder(scheme)).append("://").append(nextBroker()).append("/").append(path)
.toString();
location = new URI(url);
} catch (URISyntaxException e) {
log.warn("No broker found in zookeeper {}", e.getMessage(), e);
throw new RestException(Status.SERVICE_UNAVAILABLE, "Broker is not available");
}
return Response.temporaryRedirect(location).build();
}
/**
* Find next broke url in round-robin
*
* @return
*/
public String nextBroker() {
if (!CollectionUtils.isEmpty(availableActiveBrokers())) {
int next = count.getAndIncrement() % availableActiveBrokers().size();
return availableActiveBrokers().get(next);
} else {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
}
}
public List<String> availableActiveBrokers() {
return zkCache.getAvailableActiveBrokers();
}
/**
* initialize {@link ZookeeperCacheLoader} instance by creating ZooKeeper connection and broker connection
*
* @return {@link ZookeeperCacheLoader} to fetch available broker list
*/
private static ZookeeperCacheLoader getZookeeperCacheLoader() {
String zookeeperServers = checkNotNull(getProperty("zookeeperServers"), "zookeeperServers property not set");
ZookeeperCacheLoader zkCacheLoader;
try {
zkCacheLoader = new ZookeeperCacheLoader(zookeeperServers);
} catch (InterruptedException e) {
log.warn("Failed to fetch broker list from ZooKeeper, {}", e.getMessage(), e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (IOException e) {
log.warn("Failed to create ZooKeeper session, {}", e.getMessage(), e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
return zkCacheLoader;
}
private static final Logger log = LoggerFactory.getLogger(DiscoveryService.class);
}
......@@ -26,19 +26,18 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.web.DiscoveryServiceServlet;
/**
*
* Starts jetty server and initialize {@link DiscoveryService} web-service
*
* Starts jetty server and initialize {@link DiscoveryServiceServlet} web-service
*
*/
public class DiscoveryServiceStarter {
......@@ -65,17 +64,20 @@ public class DiscoveryServiceStarter {
}
}
});
// add servlet
final List<String> servletPackages = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", config.getZookeeperServers());
Map<String, String> initParameters = new TreeMap<>();
initParameters.put("zookeeperServers", config.getZookeeperServers());
server.addServlet("/*", DiscoveryServiceServlet.class, initParameters);
// start web-service
server.start(servletPackages);
server.start();
log.info("Discovery service is started at {}", server.getServiceUri().toString());
}
public static void main(String[] args) {
checkArgument(args.length == 1, "Need to specify a configuration file");
try {
// load config file and start server
init(args[0]);
......
......@@ -15,17 +15,16 @@
*/
package com.yahoo.pulsar.discovery.service.server;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.net.ssl.SSLContext;
import javax.servlet.Servlet;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
......@@ -38,14 +37,12 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.RestException;
import com.yahoo.pulsar.discovery.service.web.RestException;
import io.netty.util.concurrent.DefaultThreadFactory;
......@@ -79,7 +76,8 @@ public class ServerManager {
} catch (GeneralSecurityException e) {
throw new RestException(e);
}
sslCtxFactory.setWantClientAuth(true);
sslCtxFactory.setWantClientAuth(false);
ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
connectors.add(tlsConnector);
......@@ -94,16 +92,13 @@ public class ServerManager {
return this.server.getURI();
}
public void addRestResources(String basePath, String javaPackages) {
ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
servletHolder.setInitParameter("jersey.config.server.provider.packages", javaPackages);
addServlet(basePath, servletHolder);
}
public void addServlet(String path, ServletHolder servletHolder) {
public void addServlet(String path, Class<? extends Servlet> servlet, Map<String, String> initParameters) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, "/*");
ServletHolder holder = new ServletHolder(servlet);
holder.setInitParameters(initParameters);
context.addServlet(holder, path);
handlers.add(context);
}
......@@ -111,29 +106,26 @@ public class ServerManager {
return externalServicePort;
}
protected void start() throws Exception {
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
public void start() throws Exception {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
server.setHandler(handlerCollection);
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
server.setHandler(handlerCollection);
server.start();
server.start();
} catch (Exception e) {
throw new Exception(e);
}
log.info("Server started at end point {}", getServiceUri());
}
public void stop() throws Exception {
......@@ -142,13 +134,5 @@ public class ServerManager {
log.info("Server stopped successfully");
}
public void start(List<String> resources) throws Exception {
if (resources != null) {
resources.forEach(r -> this.addRestResources("/", DiscoveryService.class.getPackage().getName()));
}
this.start();
log.info("Server started at end point {}", getServiceUri().toString());
}
private static final Logger log = LoggerFactory.getLogger(ServerManager.class);
}
......@@ -15,10 +15,10 @@
*/
package com.yahoo.pulsar.discovery.service.server;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.web.DiscoveryServiceServlet;
/**
* Service Configuration to start :{@link DiscoveryService}
* Service Configuration to start :{@link DiscoveryServiceServlet}
*
*/
public class ServiceConfig {
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.web;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
/**
* Acts a load-balancer that receives any incoming request and discover active-available broker in round-robin manner
* and redirect request to that broker.
* <p>
* Accepts any {@value GET, PUT, POST} request and redirects to available broker-server to serve the request
* </p>
*
*/
public class DiscoveryServiceServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private final AtomicInteger counter = new AtomicInteger();
private ZookeeperCacheLoader zkCache;
@Override
public void init(ServletConfig config) throws ServletException {
log.info("Initializing DiscoveryServiceServlet resource");
String zookeeperServers = config.getInitParameter("zookeeperServers");
String zookeeperClientFactoryClassName = config.getInitParameter("zookeeperClientFactoryClass");
if (zookeeperClientFactoryClassName == null) {
zookeeperClientFactoryClassName = ZookeeperClientFactoryImpl.class.getName();
}
log.info("zookeeperServers={} zookeeperClientFactoryClass={}", zookeeperServers,
zookeeperClientFactoryClassName);
try {
ZooKeeperClientFactory zkClientFactory = (ZooKeeperClientFactory) Class
.forName(zookeeperClientFactoryClassName).newInstance();
zkCache = new ZookeeperCacheLoader(zkClientFactory, zookeeperServers);
} catch (Throwable t) {
throw new ServletException(t);
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doOptions(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
@Override
protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}
private void redirect(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
try {
LoadReport broker = nextBroker();
URI brokerURI;
if (request.getScheme().equals("http")) {
// Use normal HTTP url
brokerURI = new URI(broker.getWebServiceUrl());
} else {
brokerURI = new URI(broker.getWebServiceUrlTls());
}
StringBuilder location = new StringBuilder();
location.append(brokerURI.getScheme()).append("://").append(brokerURI.getHost()).append(':')
.append(brokerURI.getPort()).append(request.getRequestURI());
if (request.getQueryString() != null) {
location.append('?').append(request.getQueryString());
}
if (log.isDebugEnabled()) {
log.info("Redirecting to {}", location);
}
response.sendRedirect(location.toString());
} catch (URISyntaxException e) {
log.warn("No broker found in zookeeper {}", e.getMessage(), e);
throw new RestException(Status.SERVICE_UNAVAILABLE, "Broker is not available");
}
}
/**
* Find next broke url in round-robin
*
* @return
*/
LoadReport nextBroker() {
List<LoadReport> availableBrokers = zkCache.getAvailableBrokers();
if (availableBrokers.isEmpty()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
} else {
int brokersCount = availableBrokers.size();
int nextIdx = Math.abs(counter.getAndIncrement()) % brokersCount;
return availableBrokers.get(nextIdx);
}
}
private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceServlet.class);
}
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
package com.yahoo.pulsar.discovery.service.web;
import java.io.PrintWriter;
import java.io.StringWriter;
......
......@@ -13,134 +13,110 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
package com.yahoo.pulsar.discovery.service.web;
import java.io.IOException;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperConnectionService;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker list.
*
*
*/
public class ZookeeperCacheLoader {
// Zookeeper quorum connection string
private String zookeeperServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
private ZooKeeperCache localZkCache;
private LocalZooKeeperConnectionService localZkConnectionSvc;
private ZooKeeperClientFactory zkClientFactory = null;
private ZooKeeperChildrenCache availableActiveBrokerCache;
private volatile List<String> availableActiveBrokers;
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
public class ZookeeperCacheLoader implements Closeable {
private final ZooKeeperCache localZkCache;
private final LocalZooKeeperConnectionService localZkConnectionSvc;
private final ZooKeeperDataCache<LoadReport> brokerInfo;
private final ZooKeeperChildrenCache availableBrokersCache;
private volatile List<LoadReport> availableBrokers;
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery");
static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
private static final int zooKeeperSessionTimeoutMillis = 30_000;
/**
* Initialize ZooKeeper session and creates broker cache list
*
*
* @param zookeeperServers
* @throws InterruptedException
* : when failed to fetch broker list from cache
* @throws IOException
* : when failed create ZooKeeper session
* @throws Exception
*/
public ZookeeperCacheLoader(String zookeeperServers) throws InterruptedException, IOException {
this.zookeeperServers = zookeeperServers;
start();
}
/**
* starts ZooKeeper session
*
* @throws InterruptedException
* : when failed to fetch broker list from cache
* @throws IOException
* : when failed create zk session
*/
public void start() throws InterruptedException, IOException {
localZkConnectionSvc = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), zookeeperServers,
public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zookeeperServers) throws Exception {
localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
zooKeeperSessionTimeoutMillis);
localZkConnectionSvc.start(new ShutdownService() {
@Override
public void shutdown(int exitCode) {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
}
}
localZkConnectionSvc.start(exitCode -> {
log.error("Shutting down ZK sessions: {}", exitCode);
});
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
// attach ZooKeeperChildrenCache listener
initializeBrokerList();
}
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
}
});
/**
* 1. creates ZooKeeper Children cache on path {@value LOADBALANCE_BROKERS_ROOT}, 2. sets watch on the path and 3.
* maintain list of available brokers at availableActiveBrokers
*
* @throws InterruptedException
* @throws IOException
*
*/
private void initializeBrokerList() throws InterruptedException, IOException {
this.availableActiveBrokerCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableActiveBrokerCache.registerListener(new ZooKeeperCacheListener<Set<String>>() {
this.brokerInfo = new ZooKeeperDataCache<LoadReport>(localZkCache) {
@Override
public void onUpdate(String path, Set<String> data, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("Update Received for path {}", path);
}
availableActiveBrokers = Lists.newArrayList(data);
public LoadReport deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, LoadReport.class);
}
};
this.availableBrokersCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableBrokersCache.registerListener((path, brokerNodes, stat) -> {
try {
updateBrokerList(brokerNodes);
} catch (Exception e) {
log.warn("Error updating broker info after broker list changed.", e);
}
});
// initialize available broker list
try {
this.availableActiveBrokers = Lists.newArrayList(availableActiveBrokerCache.get());
} catch (KeeperException e) {
log.warn("Failed to find broker znode children under {}", LOADBALANCE_BROKERS_ROOT, e);
throw new IOException(String.format("Failed to find broker list in zk at %s with %s ",
LOADBALANCE_BROKERS_ROOT, e.getMessage()), e);
}
}
private ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
// Do initial fetch of brokers list
updateBrokerList(availableBrokersCache.get());
}
public List<String> getAvailableActiveBrokers() {
return this.availableActiveBrokers;
public List<LoadReport> getAvailableBrokers() {
return availableBrokers;
}
public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
@Override
public void close() {
orderedExecutor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker));
}
this.availableBrokers = availableBrokers;
}
private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheLoader.class);
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
public class BaseZKStarterTest {
protected MockZooKeeper mockZookKeeper;
protected void start() throws Exception {
mockZookKeeper = createMockZooKeeper();
}
protected void close() throws Exception {
mockZookKeeper.shutdown();
}
/**
* Create MockZookeeper instance
* @return
* @throws Exception
*/
protected MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor());
ZkUtils.createFullPathOptimistic(zk, LOADBALANCE_BROKERS_ROOT,
"".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return zk;
}
protected static class DiscoveryZooKeeperClientFactoryImpl implements ZooKeeperClientFactory {
static ZooKeeper zk;
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
return CompletableFuture.completedFuture(zk);
}
}
}
......@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
......@@ -23,26 +23,18 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.ws.rs.HttpMethod;
......@@ -52,22 +44,25 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.filter.LoggingFilter;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.discovery.service.server.DiscoveryServiceStarter;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
......@@ -75,76 +70,92 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
* 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and
* POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception
* with redirected broker
*
*
*/
public class DiscoveryServiceTest {
public class DiscoveryServiceWebTest extends BaseZKStarterTest{
private Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFilter.class));
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
@BeforeMethod
private void init() throws Exception {
start();
}
@AfterMethod
private void cleanup() throws Exception {
close();
}
@Test
public void testNextBroker() throws Exception {
List<String> brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000");
System.setProperty("zookeeperServers", "dummy-value");
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
// 1. create znode for each broker
brokers.stream().forEach(b -> {
List<String> brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3");
brokers.stream().forEach(broker -> {
try {
zk.create(LOADBALANCE_BROKERS_ROOT + "/" + b, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
LoadReport report = new LoadReport(broker, null, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + broker,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
fail("failed while creating broker znodes");
fail("failed while creating broker znodes", e);
} catch (JsonProcessingException e) {
fail("failed while creating broker znodes", e);
}
});
Thread.sleep(200);
// 2. Setup discovery-zkcache
DiscoveryServiceServlet discovery = new DiscoveryServiceServlet();
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZookeeperCacheLoader zkCache = new ZookeeperCacheLoader(new DiscoveryZooKeeperClientFactoryImpl(),
"zk-test-servers");
zkCacheField.set(discovery, zkCache);
// 2. verify nextBroker functionality : round-robin in broker list
// 3. verify nextBroker functionality : round-robin in broker list
for (String broker : brokers) {
assertEquals(broker, discovery.nextBroker());
assertEquals(broker, discovery.nextBroker().getWebServiceUrl());
}
zk.close();
}
@Test
public void testRiderectUrlWithServerStarted() throws Exception {
// 1. start server
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
int port = nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
server.start(resources);
// 2. get ZookeeperCacheLoader to add more brokers
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
// 2. create znode for each broker
List<String> brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3");
// 3. create znode for each broker
brokers.stream().forEach(b -> {
try {
zk.create(LOADBALANCE_BROKERS_ROOT + "/" + b + ":15000", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
final String broker = b + ":15000";
LoadReport report = new LoadReport("http://" + broker, null, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + broker,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
fail("failed while creating broker znodes", e);
} catch (JsonProcessingException e) {
fail("failed while creating broker znodes", e);
}
});
......@@ -152,8 +163,8 @@ public class DiscoveryServiceTest {
String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1";
/**
* verify : every time when vip receives a request: it redirects to above brokers sequentially and client must
* get unknown host exception with above brokers in a sequential manner.
* 3. verify : every time when vip receives a request: it redirects to above brokers sequentially and client
* must get unknown host exception with above brokers in a sequential manner.
**/
assertEquals(brokers, validateRequest(brokers, HttpMethod.PUT, requestUrl, new BundlesData(1)),
......@@ -164,40 +175,13 @@ public class DiscoveryServiceTest {
server.stop();
zk.close();
}
@Test
public void testDiscoveryServiceStarter() throws Exception {
int port = nextFreePort();
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
testConfigFile.delete();
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.yahoo.com,z2.yahoo.com,z3.yahoo.com");
printWriter.println("webServicePort=" + port);
printWriter.close();
testConfigFile.deleteOnExit();
DiscoveryServiceStarter.main(new String[] { testConfigFile.getAbsolutePath() });
String host = InetAddress.getLocalHost().getHostAddress();
String requestUrl = String.format("http://%s:%d/%s", host, port, "admin/namespaces/p1/c1/n1");
WebTarget webTarget = client.target(requestUrl);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
Response response = invocationBuilder.get();
assertEquals(response.getStatus(), 503);
testConfigFile.delete();
}
@Test
public void testTlsEnable() throws Exception {
// 1. start server with tls enable
final boolean allowInsecure = false;
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
int port = nextFreePort();
int tlsPort = nextFreePort();
ServiceConfig config = new ServiceConfig();
......@@ -207,29 +191,36 @@ public class DiscoveryServiceTest {
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
ServerManager server = new ServerManager(config);
server.start(resources);
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
// 2. get ZookeeperCacheLoader to add more brokers
DiscoveryService discovery = new DiscoveryService();
Field zkCacheField = DiscoveryService.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZooKeeper zk = ((ZookeeperCacheLoader) zkCacheField.get(discovery)).getLocalZkCache().getZooKeeper();
final String redirect_broker_host = "broker-1";
List<String> brokers = Lists.newArrayList(redirect_broker_host);
// 3. create znode for each broker
brokers.stream().forEach(b -> {
try {
zk.create(LOADBALANCE_BROKERS_ROOT + "/" + b + ":" + tlsPort, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
final String brokerUrl = b + ":" + port;
final String brokerUrlTls = b + ":" + tlsPort;
LoadReport report = new LoadReport("http://" + brokerUrl, "https://" + brokerUrlTls, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + brokerUrl,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
fail("failed while creating broker znodes", e);
} catch (JsonProcessingException e) {
fail("failed while creating broker znodes");
}
});
// 4. https request with tls enable at server side
// 3. https request with tls enable at server side
String serviceUrl = String.format("https://localhost:%s/", tlsPort);
String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1";
......@@ -243,7 +234,7 @@ public class DiscoveryServiceTest {
fail("it should give unknown host exception as: discovery service redirects request to: "
+ redirect_broker_host);
} catch (Exception e) {
// 5. Verify: server accepts https request and redirected to one of the available broker host defined into
// 4. Verify: server accepts https request and redirected to one of the available broker host defined into
// zk. and as broker-service is not up: it should give "UnknownHostException with host=broker-url"
String host = e.getLocalizedMessage();
assertEquals(e.getClass(), UnknownHostException.class);
......@@ -255,7 +246,6 @@ public class DiscoveryServiceTest {
@Test
public void testException() {
RestException exception1 = new RestException(BAD_GATEWAY, "test-msg");
assertTrue(exception1.getMessage().contains(BAD_GATEWAY.toString()));
RestException exception2 = new RestException(BAD_GATEWAY.getStatusCode(), "test-msg");
......
......@@ -13,18 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
......@@ -32,32 +30,33 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.beust.jcommander.internal.Lists;
import com.yahoo.pulsar.discovery.service.ZookeeperCacheLoader;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
public class ZookeeperCacheLoaderTest {
private MockZooKeeper mockZookKeeper;
public class ZookeeperCacheLoaderTest extends BaseZKStarterTest {
@BeforeMethod
void setup() throws Exception {
mockZookKeeper = MockZooKeeper.newInstance();
private void init() throws Exception {
start();
}
@AfterMethod
void teardown() throws Exception {
mockZookKeeper.shutdown();
private void cleanup() throws Exception {
close();
}
/**
* Create znode for available broker in ZooKeeper and updates it again to verify ZooKeeper cache update
*
*
* @throws InterruptedException
* @throws KeeperException
* @throws IOException
*/
@Test
public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, IOException {
ZookeeperCacheLoader zkLoader = new ZookeeperCacheLoader(null);
public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, Exception {
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
ZookeeperCacheLoader zkLoader = new ZookeeperCacheLoader(new DiscoveryZooKeeperClientFactoryImpl(), "");
List<String> brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000");
// 1. create znode for each broker
......@@ -73,7 +72,7 @@ public class ZookeeperCacheLoaderTest {
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 2. get available brokers from ZookeeperCacheLoader
List<String> list = zkLoader.getAvailableActiveBrokers();
List<LoadReport> list = zkLoader.getAvailableBrokers();
// 3. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
......@@ -86,7 +85,7 @@ public class ZookeeperCacheLoaderTest {
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 4.b. get available brokers from ZookeeperCacheLoader
list = zkLoader.getAvailableActiveBrokers();
list = zkLoader.getAvailableBrokers();
// 4.c. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.zookeeper;
import static com.yahoo.pulsar.discovery.service.DiscoveryService.LOADBALANCE_BROKERS_ROOT;
import java.io.IOException;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.yahoo.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
/**
*
* Test Mock LocalZooKeeperConnectionService
*
*/
public class LocalZooKeeperConnectionService {
private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperConnectionService.class);
private final ZooKeeperClientFactory zkClientFactory;
private final String zkConnect;
private final long zkSessionTimeoutMillis;
private ZooKeeper localZooKeeper;
private ZooKeeperSessionWatcher localZooKeeperSessionWatcher;
public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, String zkConnect,
long zkSessionTimeoutMillis) {
this.zkClientFactory = zkClientFactory;
this.zkConnect = zkConnect;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
}
public void start(ShutdownService shutdownService) throws IOException {
// mock zk
try {
localZooKeeper = MockZooKeeper.newInstance();
ZkUtils.createFullPathOptimistic(localZooKeeper, LOADBALANCE_BROKERS_ROOT, "test".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis,
shutdownService);
localZooKeeperSessionWatcher.start();
localZooKeeper.register(localZooKeeperSessionWatcher);
} catch (Exception e) {
throw new IOException("Failed to establish session with local ZK", e);
}
}
public void close() throws IOException {
if (localZooKeeper != null) {
try {
localZooKeeper.close();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
if (localZooKeeperSessionWatcher != null) {
localZooKeeperSessionWatcher.close();
}
}
public ZooKeeper getLocalZooKeeper() {
return this.localZooKeeper;
}
/**
* Check if a persist node exists. If not, it attempts to create the znode.
*
* @param path
* znode path
* @throws KeeperException
* zookeeper exception.
* @throws InterruptedException
* zookeeper exception.
*/
public static void checkAndCreatePersistNode(ZooKeeper zkc, String path)
throws KeeperException, InterruptedException {
// check if the node exists
if (zkc.exists(path, false) == null) {
/**
* create znode
*/
try {
// do create the node
zkc.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("created znode, path={}", path);
} catch (Exception e) {
LOG.warn("create znode failed, path={} : {}", path, e.getMessage(), e);
}
}
}
public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data, createMode, false);
}
public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode, boolean gc)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data.getBytes(Charsets.UTF_8), createMode, gc);
}
public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode)
throws KeeperException, InterruptedException {
return createIfAbsent(zk, path, data, createMode, false);
}
public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode, boolean gc)
throws KeeperException, InterruptedException {
String pathCreated = null;
try {
pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
} catch (NodeExistsException e) {
// OK
LOG.debug("Create skipped for existing znode: path={}", path);
}
// reset if what exists is the ephemeral garbage.
if (gc && (pathCreated == null) && CreateMode.EPHEMERAL.equals(createMode)) {
Stat stat = zk.exists(path, false);
if (stat != null && zk.getSessionId() != stat.getEphemeralOwner()) {
deleteIfExists(zk, path, -1);
pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
}
}
return pathCreated;
}
public static void deleteIfExists(ZooKeeper zk, String path, int version)
throws KeeperException, InterruptedException {
try {
zk.delete(path, version);
} catch (NoNodeException e) {
// OK
LOG.debug("Delete skipped for non-existing znode: path={}", path);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册