提交 0577fdaa 编写于 作者: R Rad Gruchalski 提交者: Matteo Merli

WebService refactoring (#40)

* Little WebService refactor, removing magic values, removing unnecessary argument in the constructor.
上级 577596e4
...@@ -246,7 +246,7 @@ public class PulsarService implements AutoCloseable { ...@@ -246,7 +246,7 @@ public class PulsarService implements AutoCloseable {
LOG.info("Starting Pulsar Broker service"); LOG.info("Starting Pulsar Broker service");
brokerService.start(); brokerService.start();
this.webService = new WebService(config, this); this.webService = new WebService(this);
this.webService.addRestResources("/", "com.yahoo.pulsar.broker.web", false); this.webService.addRestResources("/", "com.yahoo.pulsar.broker.web", false);
this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true); this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true); this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);
......
...@@ -73,7 +73,7 @@ public abstract class PulsarWebResource { ...@@ -73,7 +73,7 @@ public abstract class PulsarWebResource {
protected PulsarService pulsar() { protected PulsarService pulsar() {
if (pulsar == null) { if (pulsar == null) {
pulsar = (PulsarService) servletContext.getAttribute("pulsar"); pulsar = (PulsarService) servletContext.getAttribute(WebService.ATTRIBUTE_PULSAR_NAME);
} }
return pulsar; return pulsar;
......
...@@ -53,7 +53,6 @@ import com.google.common.collect.ImmutableList; ...@@ -53,7 +53,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException; import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.common.util.ObjectMapperFactory; import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.SecurityUtility; import com.yahoo.pulsar.common.util.SecurityUtility;
...@@ -63,52 +62,61 @@ import io.netty.util.concurrent.DefaultThreadFactory; ...@@ -63,52 +62,61 @@ import io.netty.util.concurrent.DefaultThreadFactory;
* Web Service embedded into Pulsar * Web Service embedded into Pulsar
*/ */
public class WebService implements AutoCloseable { public class WebService implements AutoCloseable {
private final ServiceConfiguration config;
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final ExecutorService webServiceExecutor;
/** /**
* The set of path regexes on which the ApiVersionFilter is installed if needed * The set of path regexes on which the ApiVersionFilter is installed if needed
*/ */
private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(Pattern.compile("^/lookup.*") // V2 private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(
// lookups Pattern.compile("^/lookup.*") // V2 lookups
); );
private static final String MATCH_ALL = "/*";
public static final String ATTRIBUTE_PULSAR_NAME = "pulsar";
public static final String HANDLER_CACHE_CONTROL = "max-age=3600";
public static final String HANDLER_REQUEST_LOG_TZ = "GMT";
public static final int NUM_ACCEPTORS = 32; // make it configurable?
public static final int MAX_CONCURRENT_REQUESTES = 1024; // make it configurable?
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final ExecutorService webServiceExecutor;
public WebService(ServiceConfiguration config, PulsarService pulsar) throws PulsarServerException { public WebService(PulsarService pulsar) throws PulsarServerException {
this.config = config; this.handlers = Lists.newArrayList();
this.pulsar = pulsar; this.pulsar = pulsar;
this.webServiceExecutor = Executors.newFixedThreadPool(32, new DefaultThreadFactory("pulsar-web")); this.webServiceExecutor = Executors.newFixedThreadPool(WebService.NUM_ACCEPTORS, new DefaultThreadFactory("pulsar-web"));
this.server = new Server(new ExecutorThreadPool(webServiceExecutor)); this.server = new Server(new ExecutorThreadPool(webServiceExecutor));
List<ServerConnector> connectors = new ArrayList<>(); List<ServerConnector> connectors = new ArrayList<>();
ServerConnector connector = new PulsarServerConnector(server, 1, 1); ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort()); connector.setPort(pulsar.getConfiguration().getWebServicePort());
connector.setHost(pulsar.getBindAddress()); connector.setHost(pulsar.getBindAddress());
connectors.add(connector); connectors.add(connector);
if (config.isTlsEnabled()) { if (pulsar.getConfiguration().isTlsEnabled()) {
SslContextFactory sslCtxFactory = new SslContextFactory(); SslContextFactory sslCtxFactory = new SslContextFactory();
try { try {
SSLContext sslCtx = SecurityUtility.createSslContext(config.isTlsAllowInsecureConnection(), sslCtxFactory.setSslContext(
config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(), SecurityUtility.createSslContext(
config.getTlsKeyFilePath()); pulsar.getConfiguration().isTlsAllowInsecureConnection(),
sslCtxFactory.setSslContext(sslCtx); pulsar.getConfiguration().getTlsTrustCertsFilePath(),
pulsar.getConfiguration().getTlsCertificateFilePath(),
pulsar.getConfiguration().getTlsKeyFilePath()));
} catch (GeneralSecurityException e) { } catch (GeneralSecurityException e) {
throw new PulsarServerException(e); throw new PulsarServerException(e);
} }
sslCtxFactory.setWantClientAuth(true); sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory); ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls()); tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
tlsConnector.setHost(pulsar.getBindAddress()); tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector); connectors.add(tlsConnector);
} }
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size())); connectors.forEach(c -> c.setAcceptQueueSize(WebService.MAX_CONCURRENT_REQUESTES / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
} }
...@@ -125,23 +133,23 @@ public class WebService implements AutoCloseable { ...@@ -125,23 +133,23 @@ public class WebService implements AutoCloseable {
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) { public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path); context.setContextPath(path);
context.addServlet(servletHolder, "/*"); context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute("pulsar", pulsar); context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar);
context.setAttribute("config", config);
if (requiresAuthentication && config.isAuthenticationEnabled()) { if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar)); FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar));
context.addFilter(filter, "/*", EnumSet.allOf(DispatcherType.class)); context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
} }
log.info("Servlet path: '{}' -- Enable client version check: {} -- shouldCheckApiVersionOnPath: {}", path, log.info("Servlet path: '{}' -- Enable client version check: {} -- shouldCheckApiVersionOnPath: {}", path,
config.isClientLibraryVersionCheckEnabled(), shouldCheckApiVersionOnPath(path)); pulsar.getConfiguration().isClientLibraryVersionCheckEnabled(),
if (config.isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) { shouldCheckApiVersionOnPath(path));
if (pulsar.getConfiguration().isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) {
// Add the ApiVersionFilter to reject request from deprecated // Add the ApiVersionFilter to reject request from deprecated
// clients. // clients.
FilterHolder holder = new FilterHolder( FilterHolder holder = new FilterHolder(
new ApiVersionFilter(pulsar, config.isClientLibraryVersionCheckAllowUnversioned())); new ApiVersionFilter(pulsar, pulsar.getConfiguration().isClientLibraryVersionCheckAllowUnversioned()));
context.addFilter(holder, "/*", EnumSet.allOf(DispatcherType.class)); context.addFilter(holder, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
log.info("Enabling ApiVersionFilter"); log.info("Enabling ApiVersionFilter");
} }
...@@ -154,7 +162,7 @@ public class WebService implements AutoCloseable { ...@@ -154,7 +162,7 @@ public class WebService implements AutoCloseable {
ResourceHandler resHandler = new ResourceHandler(); ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true); resHandler.setEtags(true);
resHandler.setCacheControl("max-age=3600"); resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler); capHandler.setHandler(resHandler);
handlers.add(capHandler); handlers.add(capHandler);
} }
...@@ -181,7 +189,7 @@ public class WebService implements AutoCloseable { ...@@ -181,7 +189,7 @@ public class WebService implements AutoCloseable {
RequestLogHandler requestLogHandler = new RequestLogHandler(); RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog(); Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true); requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT"); requestLog.setLogTimeZone(WebService.HANDLER_REQUEST_LOG_TZ);
requestLog.setLogLatency(true); requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog); requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection()); handlers.add(0, new ContextHandlerCollection());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册