提交 4c6eba9e 编写于 作者: F fjy

Merge pull request #645 from metamx/smarter-router

Allow router to override selection based on config
......@@ -37,6 +37,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
......@@ -66,6 +67,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Joiner COMMA_JOIN = Joiner.on(",");
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
......@@ -74,6 +76,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private final RequestLogger requestLogger;
public AsyncQueryForwardingServlet(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
......@@ -82,6 +85,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
RequestLogger requestLogger
)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
......@@ -95,6 +99,8 @@ public class AsyncQueryForwardingServlet extends HttpServlet
throws ServletException, IOException
{
final AsyncContext asyncContext = req.startAsync(req, res);
// default async timeout to be same as maxIdleTime for now
asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis());
asyncContext.start(
new Runnable()
{
......
......@@ -41,6 +41,7 @@ import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime;
......@@ -70,6 +71,7 @@ public class QueryResource
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
......@@ -79,6 +81,7 @@ public class QueryResource
@Inject
public QueryResource(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
......@@ -87,6 +90,7 @@ public class QueryResource
QueryManager queryManager
)
{
this.config = config;
this.jsonMapper = jsonMapper.copy();
this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
......@@ -135,6 +139,15 @@ public class QueryResource
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
}
if (query.getContextValue("timeout") == null) {
query = query.withOverriddenContext(
ImmutableMap.of(
"timeout",
config.getMaxIdleTime().toStandardDuration().getMillis()
)
);
}
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
/**
*/
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);
if (priority < 0) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}
return Optional.absent();
}
}
......@@ -54,6 +54,10 @@ public class TieredBrokerConfig
@NotNull
private Period pollPeriod = new Period("PT1M");
@JsonProperty
@NotNull
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";
// tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap()
{
......@@ -88,4 +92,9 @@ public class TieredBrokerConfig
{
return pollPeriod;
}
public String getStrategies()
{
return strategies;
}
}
......@@ -19,6 +19,7 @@
package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
......@@ -30,7 +31,6 @@ import io.druid.client.selector.HostSelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
......@@ -50,6 +50,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final List<TieredBrokerSelectorStrategy> strategies;
private final Object lock = new Object();
......@@ -59,12 +60,14 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager,
TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
ServerDiscoveryFactory serverDiscoveryFactory,
List<TieredBrokerSelectorStrategy> strategies
)
{
this.ruleManager = ruleManager;
this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory;
this.strategies = strategies;
}
@LifecycleStart
......@@ -128,12 +131,12 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
String brokerServiceName = null;
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
brokerServiceName = Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
);
for (TieredBrokerSelectorStrategy strategy : strategies) {
final Optional<String> optionalName = strategy.getBrokerServiceName(tierConfig, query);
if (optionalName.isPresent()) {
brokerServiceName = optionalName.get();
break;
}
}
if (brokerServiceName == null) {
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.List;
/**
*/
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
{
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList();
@Inject
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
{
try {
this.strategies.addAll(
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public List<TieredBrokerSelectorStrategy> get()
{
return strategies;
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import io.druid.query.Query;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class)
})
public interface TieredBrokerSelectorStrategy
{
public Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query query);
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
/**
*/
public class TimeBoundaryTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}
return Optional.absent();
}
}
......@@ -84,7 +84,8 @@ public class TieredBrokerHostSelectorTest
return "hotBroker";
}
},
factory
factory,
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy())
);
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
......@@ -196,6 +197,30 @@ public class TieredBrokerHostSelectorTest
Assert.assertEquals("coldBroker", brokerName);
}
@Test
public void testPrioritySelect() throws Exception
{
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.intervals(
new MultipleIntervalSegmentSpec(
Arrays.<Interval>asList(
new Interval("2011-08-31/2011-09-01"),
new Interval("2012-08-31/2012-09-01"),
new Interval("2013-08-31/2013-09-01")
)
)
)
.context(ImmutableMap.<String, Object>of("priority", -1))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
}
private static class TestRuleManager extends CoordinatorRuleManager
{
public TestRuleManager(
......
......@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.RoutingDruidClient;
......@@ -42,6 +43,8 @@ import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector;
import io.druid.server.router.TieredBrokerSelectorStrategiesProvider;
import io.druid.server.router.TieredBrokerSelectorStrategy;
import org.eclipse.jetty.server.Server;
import java.util.List;
......@@ -79,6 +82,10 @@ public class CliRouter extends ServerRunnable
binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
binder.bind(QueryHostFinder.class).in(LazySingleton.class);
binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>(){})
.toProvider(TieredBrokerSelectorStrategiesProvider.class)
.in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
......
......@@ -29,6 +29,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.eclipse.jetty.server.Handler;
......@@ -44,6 +45,7 @@ import org.eclipse.jetty.servlets.AsyncGzipFilter;
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
......@@ -53,6 +55,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
@Inject
public RouterJettyServerInitializer(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
......@@ -61,6 +64,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
RequestLogger requestLogger
)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
......@@ -76,6 +80,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
queries.addServlet(
new ServletHolder(
new AsyncQueryForwardingServlet(
config,
jsonMapper,
smileMapper,
hostFinder,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册