提交 43a05541 编写于 作者: F fjy

Merge pull request #455 from metamx/async

Asynchronous servlet for query forwarding
......@@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
......@@ -313,17 +314,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
......
......@@ -17,66 +17,97 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class RouterQuerySegmentWalker implements QuerySegmentWalker
public class RoutingDruidClient<IntermediateType, FinalType>
{
private final QueryToolChestWarehouse warehouse;
private static final Logger log = new Logger(RoutingDruidClient.class);
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final BrokerSelector brokerSelector;
private final TierConfig tierConfig;
private final AtomicInteger openConnections;
private final boolean isSmile;
@Inject
public RouterQuerySegmentWalker(
QueryToolChestWarehouse warehouse,
public RoutingDruidClient(
ObjectMapper objectMapper,
@Global HttpClient httpClient,
BrokerSelector brokerSelector,
TierConfig tierConfig
@Global HttpClient httpClient
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return makeRunner();
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
public int getNumOpenConnections()
{
return makeRunner();
return openConnections.get();
}
private <T> QueryRunner<T> makeRunner()
public ListenableFuture<FinalType> run(
String host,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
return new TierAwareQueryRunner<T>(
warehouse,
objectMapper,
httpClient,
brokerSelector,
tierConfig
);
final ListenableFuture<FinalType> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.go(responseHandler);
openConnections.getAndIncrement();
Futures.addCallback(
future,
new FutureCallback<FinalType>()
{
@Override
public void onSuccess(FinalType result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return future;
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.metamx.common.Pair;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
/**
*/
public interface HostSelector<T>
{
public String getDefaultServiceName();
public Pair<String, ServerDiscoverySelector> select(Query<T> query);
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.client.RoutingDruidClient;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.Query;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.DateTime;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
*/
@WebServlet(asyncSupported = true)
public class AsyncQueryForwardingServlet extends HttpServlet
{
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String DISPATCHED = "dispatched";
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final RoutingDruidClient routingDruidClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIDProvider idProvider;
public AsyncQueryForwardingServlet(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
RoutingDruidClient routingDruidClient,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryIDProvider idProvider
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.routingDruidClient = routingDruidClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType());
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
try {
final AsyncContext ctx = req.startAsync(req, resp);
if (req.getAttribute(DISPATCHED) != null) {
return;
}
req.setAttribute(DISPATCHED, true);
resp.setStatus(200);
resp.setContentType("application/x-javascript");
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = idProvider.next(query);
query = query.withId(queryId);
}
requestLogger.log(
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
);
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getHost(query);
final Query theQuery = query;
final String theQueryId = queryId;
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
outputStream.write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
byte[] bytes = getContentBytes(chunk.getContent());
if (bytes.length > 0) {
try {
clientResponse.getObj().write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.info("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(theQuery.getDataSource().getName())
.setUser4(theQuery.getType())
.setUser5(theQuery.getIntervals().get(0).toString())
.setUser6(String.valueOf(theQuery.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(theQueryId)
.setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
ctx.dispatch();
}
return ClientResponse.finished(obj);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
};
ctx.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.run(host, theQuery, responseHandler);
}
}
);
}
catch (Exception e) {
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
log.makeAlert(e, "Exception handling request")
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.emit();
}
}
}
......@@ -136,8 +136,8 @@ public class QueryResource
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.setUser10(queryId)
.build("request/time", requestTime)
);
}
......
......@@ -49,7 +49,7 @@ import io.druid.server.DruidNode;
import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.ServletException;
......@@ -154,13 +154,11 @@ public class JettyServerModule extends JerseyServletModule
threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads());
final Server server = new Server();
server.setThreadPool(threadPool);
final Server server = new Server(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();
ServerConnector connector = new ServerConnector(server);
connector.setPort(node.getPort());
connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
connector.setStatsOn(true);
connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
server.setConnectors(new Connector[]{connector});
......
......@@ -57,7 +57,7 @@ public class CoordinatorRuleManager
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final Supplier<TierConfig> config;
private final Supplier<TieredBrokerConfig> config;
private final ServerDiscoverySelector selector;
private final StatusResponseHandler responseHandler;
......@@ -73,7 +73,7 @@ public class CoordinatorRuleManager
public CoordinatorRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
Supplier<TierConfig> config,
Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{
......
......@@ -19,86 +19,67 @@
package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import io.druid.client.DirectDruidClient;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class TierAwareQueryRunner<T> implements QueryRunner<T>
public class QueryHostFinder<T>
{
private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class);
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final BrokerSelector<T> brokerSelector;
private final TierConfig tierConfig;
private final TieredBrokerHostSelector hostSelector;
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
public TierAwareQueryRunner(
QueryToolChestWarehouse warehouse,
ObjectMapper objectMapper,
HttpClient httpClient,
BrokerSelector<T> brokerSelector,
TierConfig tierConfig
@Inject
public QueryHostFinder(
TieredBrokerHostSelector hostSelector
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
this.hostSelector = hostSelector;
}
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
final String brokerServiceName = selected.lhs;
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
final String serviceName = selected.lhs;
final ServerDiscoverySelector selector = selected.rhs;
Server server = selector.pick();
if (server == null) {
log.error(
"WTF?! No server found for brokerServiceName[%s]. Using backup",
brokerServiceName
"WTF?! No server found for serviceName[%s]. Using backup",
serviceName
);
server = serverBackup.get(brokerServiceName);
server = serverBackup.get(serviceName);
if (server == null) {
log.makeAlert(
"WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
brokerServiceName,
tierConfig.getDefaultBrokerServiceName()
).emit();
log.error(
"WTF?! No backup found for serviceName[%s]. Using default[%s]",
serviceName,
hostSelector.getDefaultServiceName()
);
server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
server = serverBackup.get(hostSelector.getDefaultServiceName());
}
} else {
serverBackup.put(brokerServiceName, server);
}
if (server != null) {
serverBackup.put(serviceName, server);
}
return server;
}
@Override
public Sequence<T> run(Query<T> query)
public String getHost(Query<T> query)
{
Server server = findServer(query);
......@@ -106,16 +87,12 @@ public class TierAwareQueryRunner<T> implements QueryRunner<T>
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return Sequences.empty();
return null;
}
QueryRunner<T> client = new DirectDruidClient<T>(
warehouse,
objectMapper,
httpClient,
server.getHost()
);
log.info("Selected [%s]", server.getHost());
return client.run(query);
return server.getHost();
}
}
......@@ -29,7 +29,7 @@ import java.util.LinkedHashMap;
/**
*/
public class TierConfig
public class TieredBrokerConfig
{
@JsonProperty
@NotNull
......
......@@ -20,20 +20,20 @@
package io.druid.server.router;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.client.selector.HostSelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
......@@ -42,23 +42,23 @@ import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BrokerSelector<T>
public class TieredBrokerHostSelector<T> implements HostSelector<T>
{
private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
private final CoordinatorRuleManager ruleManager;
private final TierConfig tierConfig;
private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final Object lock = new Object();
private volatile boolean started = false;
@Inject
public BrokerSelector(
public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager,
TierConfig tierConfig,
TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
......@@ -112,6 +112,12 @@ public class BrokerSelector<T>
}
}
@Override
public String getDefaultServiceName()
{
return tierConfig.getDefaultBrokerServiceName();
}
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
{
synchronized (lock) {
......@@ -120,35 +126,46 @@ public class BrokerSelector<T>
}
}
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
String brokerServiceName = null;
// find the rule that can apply to the entire set of intervals
DateTime now = new DateTime();
int lastRulePosition = -1;
LoadRule baseRule = null;
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
brokerServiceName = Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
);
}
for (Interval interval : query.getIntervals()) {
int currRulePosition = 0;
for (Rule rule : rules) {
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
lastRulePosition = currRulePosition;
baseRule = (LoadRule) rule;
break;
if (brokerServiceName == null) {
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
// find the rule that can apply to the entire set of intervals
DateTime now = new DateTime();
int lastRulePosition = -1;
LoadRule baseRule = null;
for (Interval interval : query.getIntervals()) {
int currRulePosition = 0;
for (Rule rule : rules) {
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
lastRulePosition = currRulePosition;
baseRule = (LoadRule) rule;
break;
}
currRulePosition++;
}
currRulePosition++;
}
}
if (baseRule == null) {
return null;
}
if (baseRule == null) {
return null;
}
// in the baseRule, find the broker of highest priority
String brokerServiceName = null;
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
brokerServiceName = entry.getValue();
break;
// in the baseRule, find the broker of highest priority
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
brokerServiceName = entry.getValue();
break;
}
}
}
......
......@@ -40,20 +40,20 @@ import java.util.LinkedHashMap;
/**
*/
public class TierAwareQueryRunnerTest
public class QueryHostFinderTest
{
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
private TierConfig config;
private TieredBrokerHostSelector brokerSelector;
private TieredBrokerConfig config;
private Server server;
@Before
public void setUp() throws Exception
{
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = EasyMock.createMock(BrokerSelector.class);
brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class);
config = new TierConfig()
config = new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
......@@ -118,12 +118,8 @@ public class TierAwareQueryRunnerTest
EasyMock.expect(selector.pick()).andReturn(server).once();
EasyMock.replay(selector);
TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
null,
null,
null,
brokerSelector,
config
QueryHostFinder queryRunner = new QueryHostFinder(
brokerSelector
);
Server server = queryRunner.findServer(
......
......@@ -50,11 +50,11 @@ import java.util.List;
/**
*/
public class BrokerSelectorTest
public class TieredBrokerHostSelectorTest
{
private ServerDiscoveryFactory factory;
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
private TieredBrokerHostSelector brokerSelector;
@Before
public void setUp() throws Exception
......@@ -62,9 +62,9 @@ public class BrokerSelectorTest
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = new BrokerSelector(
brokerSelector = new TieredBrokerHostSelector(
new TestRuleManager(null, null, null, null),
new TierConfig()
new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
......@@ -112,11 +112,12 @@ public class BrokerSelectorTest
public void testBasicSelect() throws Exception
{
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01")))
.build()
).lhs;
Assert.assertEquals("coldBroker", brokerName);
......@@ -127,11 +128,12 @@ public class BrokerSelectorTest
public void testBasicSelect2() throws Exception
{
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01")))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
......@@ -141,11 +143,12 @@ public class BrokerSelectorTest
public void testSelectMatchesNothing() throws Exception
{
Pair retVal = brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01")))
.build()
);
Assert.assertEquals(null, retVal);
......@@ -199,7 +202,7 @@ public class BrokerSelectorTest
public TestRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
Supplier<TierConfig> config,
Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{
......
......@@ -25,24 +25,20 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.QueryResource;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.router.BrokerSelector;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.RouterQuerySegmentWalker;
import io.druid.server.router.TierConfig;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector;
import org.eclipse.jetty.server.Server;
import java.util.List;
......@@ -71,19 +67,16 @@ public class CliRouter extends ServerRunnable
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
binder.bind(QueryHostFinder.class).in(LazySingleton.class);
binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
......@@ -92,7 +85,7 @@ public class CliRouter extends ServerRunnable
@Provides
@ManageLifecycle
public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
TierConfig config,
TieredBrokerConfig config,
ServerDiscoveryFactory factory
)
......
......@@ -38,16 +38,13 @@ public class QueryJettyServerInitializer implements JettyServerInitializer
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.RoutingDruidClient;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.QueryIDProvider;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final RoutingDruidClient routingDruidClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIDProvider idProvider;
@Inject
public RouterJettyServerInitializer(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
RoutingDruidClient routingDruidClient,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryIDProvider idProvider
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.routingDruidClient = routingDruidClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.addServlet(
new ServletHolder(
new AsyncQueryForwardingServlet(
jsonMapper,
smileMapper,
hostFinder,
routingDruidClient,
emitter,
requestLogger,
idProvider
)
), "/druid/v2/*"
);
queries.addFilter(GzipFilter.class, "/druid/v2/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册