diff --git a/pom.xml b/pom.xml index 02575cd1a42f12849d0cd6cef509af17db9d6232..d1b94e4fbc95e46c55718d09d4ce88e3d8d4a216 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,8 @@ ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. --> - + 4.0.0 io.druid druid @@ -313,17 +314,17 @@ org.eclipse.jetty jetty-server - 8.1.11.v20130520 + 9.1.3.v20140225 org.eclipse.jetty jetty-servlet - 8.1.11.v20130520 + 9.1.3.v20140225 org.eclipse.jetty jetty-servlets - 8.1.11.v20130520 + 9.1.3.v20140225 joda-time diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java new file mode 100644 index 0000000000000000000000000000000000000000..9fd3e2b0eac1f78daec6769f35b0943a6358ace6 --- /dev/null +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -0,0 +1,113 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.guice.annotations.Global; +import io.druid.query.Query; +import org.jboss.netty.handler.codec.http.HttpHeaders; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class RoutingDruidClient +{ + private static final Logger log = new Logger(RoutingDruidClient.class); + + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + + private final AtomicInteger openConnections; + private final boolean isSmile; + + @Inject + public RoutingDruidClient( + ObjectMapper objectMapper, + @Global HttpClient httpClient + ) + { + this.objectMapper = objectMapper; + this.httpClient = httpClient; + + this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; + this.openConnections = new AtomicInteger(); + } + + public int getNumOpenConnections() + { + return openConnections.get(); + } + + public ListenableFuture run( + String host, + Query query, + HttpResponseHandler responseHandler + ) + { + final ListenableFuture future; + final String url = String.format("http://%s/druid/v2/", host); + + try { + log.debug("Querying url[%s]", url); + future = httpClient + .post(new URL(url)) + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .go(responseHandler); + + openConnections.getAndIncrement(); + + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(FinalType result) + { + openConnections.getAndDecrement(); + } + + @Override + public void onFailure(Throwable t) + { + openConnections.getAndDecrement(); + } + } + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + + return future; + } +} diff --git a/server/src/main/java/io/druid/client/selector/HostSelector.java b/server/src/main/java/io/druid/client/selector/HostSelector.java new file mode 100644 index 0000000000000000000000000000000000000000..dd8f366ac94d524b0d4e4286c0011abb78aecc2f --- /dev/null +++ b/server/src/main/java/io/druid/client/selector/HostSelector.java @@ -0,0 +1,33 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.selector; + +import com.metamx.common.Pair; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; + +/** + */ +public interface HostSelector +{ + public String getDefaultServiceName(); + + public Pair select(Query query); +} diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java new file mode 100644 index 0000000000000000000000000000000000000000..85f33a7000777d95ec7298f2b2b642fb8b00d694 --- /dev/null +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -0,0 +1,238 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import com.metamx.http.client.response.ClientResponse; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.client.RoutingDruidClient; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.Query; +import io.druid.server.log.RequestLogger; +import io.druid.server.router.QueryHostFinder; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.joda.time.DateTime; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +/** + */ +@WebServlet(asyncSupported = true) +public class AsyncQueryForwardingServlet extends HttpServlet +{ + private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final String DISPATCHED = "dispatched"; + + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; + private final QueryHostFinder hostFinder; + private final RoutingDruidClient routingDruidClient; + private final ServiceEmitter emitter; + private final RequestLogger requestLogger; + private final QueryIDProvider idProvider; + + public AsyncQueryForwardingServlet( + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QueryHostFinder hostFinder, + RoutingDruidClient routingDruidClient, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryIDProvider idProvider + ) + { + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + this.hostFinder = hostFinder; + this.routingDruidClient = routingDruidClient; + this.emitter = emitter; + this.requestLogger = requestLogger; + this.idProvider = idProvider; + } + + @Override + protected void doPost( + final HttpServletRequest req, final HttpServletResponse resp + ) throws ServletException, IOException + { + final long start = System.currentTimeMillis(); + Query query = null; + String queryId; + + final boolean isSmile = "application/smile".equals(req.getContentType()); + + ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + + OutputStream out = null; + + try { + final AsyncContext ctx = req.startAsync(req, resp); + + if (req.getAttribute(DISPATCHED) != null) { + return; + } + + req.setAttribute(DISPATCHED, true); + resp.setStatus(200); + resp.setContentType("application/x-javascript"); + + query = objectMapper.readValue(req.getInputStream(), Query.class); + queryId = query.getId(); + if (queryId == null) { + queryId = idProvider.next(query); + query = query.withId(queryId); + } + + requestLogger.log( + new RequestLogLine(new DateTime(), req.getRemoteAddr(), query) + ); + out = resp.getOutputStream(); + final OutputStream outputStream = out; + + final String host = hostFinder.getHost(query); + + final Query theQuery = query; + final String theQueryId = queryId; + + final HttpResponseHandler responseHandler = new HttpResponseHandler() + { + @Override + public ClientResponse handleResponse(HttpResponse response) + { + byte[] bytes = getContentBytes(response.getContent()); + if (bytes.length > 0) { + try { + outputStream.write(bytes); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + return ClientResponse.finished(outputStream); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + byte[] bytes = getContentBytes(chunk.getContent()); + if (bytes.length > 0) { + try { + clientResponse.getObj().write(bytes); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + final long requestTime = System.currentTimeMillis() - start; + + log.info("Request time: %d", requestTime); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(theQuery.getDataSource().getName()) + .setUser4(theQuery.getType()) + .setUser5(theQuery.getIntervals().get(0).toString()) + .setUser6(String.valueOf(theQuery.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(theQueryId) + .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + final OutputStream obj = clientResponse.getObj(); + try { + resp.flushBuffer(); + outputStream.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + ctx.dispatch(); + } + + return ClientResponse.finished(obj); + } + + private byte[] getContentBytes(ChannelBuffer content) + { + byte[] contentBytes = new byte[content.readableBytes()]; + content.readBytes(contentBytes); + return contentBytes; + } + }; + + ctx.start( + new Runnable() + { + @Override + public void run() + { + routingDruidClient.run(host, theQuery, responseHandler); + } + } + ); + } + catch (Exception e) { + if (!resp.isCommitted()) { + resp.setStatus(500); + resp.resetBuffer(); + + if (out == null) { + out = resp.getOutputStream(); + } + + out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); + out.write("\n".getBytes(UTF8)); + } + + resp.flushBuffer(); + + log.makeAlert(e, "Exception handling request") + .addData("query", query) + .addData("peer", req.getRemoteAddr()) + .emit(); + } + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 051a56e4465c38e277e20c81c9ceadd43f31b881..a69a385950388aeafabeaf3331c234a9bd35af46 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -136,8 +136,8 @@ public class QueryResource .setUser5(query.getIntervals().get(0).toString()) .setUser6(String.valueOf(query.hasFilters())) .setUser7(req.getRemoteAddr()) + .setUser8(queryId) .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .setUser10(queryId) .build("request/time", requestTime) ); } diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java index 246645dac0cf0420b4eabbb3dbf1349b8254a260..0b93dce2270b244f852b66f6886aa279eef5aba4 100644 --- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java @@ -49,7 +49,7 @@ import io.druid.server.DruidNode; import io.druid.server.StatusResource; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.thread.QueuedThreadPool; import javax.servlet.ServletException; @@ -154,13 +154,11 @@ public class JettyServerModule extends JerseyServletModule threadPool.setMinThreads(config.getNumThreads()); threadPool.setMaxThreads(config.getNumThreads()); - final Server server = new Server(); - server.setThreadPool(threadPool); + final Server server = new Server(threadPool); - SelectChannelConnector connector = new SelectChannelConnector(); + ServerConnector connector = new ServerConnector(server); connector.setPort(node.getPort()); - connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis())); - connector.setStatsOn(true); + connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis())); server.setConnectors(new Connector[]{connector}); diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 9acf5f9485866b632f504424fc6a549c84d1211f..adf8e6fc48efa0b552f974fc8b2b776b791183ad 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -57,7 +57,7 @@ public class CoordinatorRuleManager private final HttpClient httpClient; private final ObjectMapper jsonMapper; - private final Supplier config; + private final Supplier config; private final ServerDiscoverySelector selector; private final StatusResponseHandler responseHandler; @@ -73,7 +73,7 @@ public class CoordinatorRuleManager public CoordinatorRuleManager( @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, - Supplier config, + Supplier config, ServerDiscoverySelector selector ) { diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java new file mode 100644 index 0000000000000000000000000000000000000000..dfd94ca5cbf4c6c85eb123c39a45787daa2111f0 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -0,0 +1,98 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.inject.Inject; +import com.metamx.common.Pair; +import com.metamx.emitter.EmittingLogger; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; + +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class QueryHostFinder +{ + private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class); + + private final TieredBrokerHostSelector hostSelector; + + private final ConcurrentHashMap serverBackup = new ConcurrentHashMap(); + + @Inject + public QueryHostFinder( + TieredBrokerHostSelector hostSelector + ) + { + this.hostSelector = hostSelector; + } + + public Server findServer(Query query) + { + final Pair selected = hostSelector.select(query); + + final String serviceName = selected.lhs; + final ServerDiscoverySelector selector = selected.rhs; + + Server server = selector.pick(); + if (server == null) { + log.error( + "WTF?! No server found for serviceName[%s]. Using backup", + serviceName + ); + + server = serverBackup.get(serviceName); + + if (server == null) { + log.error( + "WTF?! No backup found for serviceName[%s]. Using default[%s]", + serviceName, + hostSelector.getDefaultServiceName() + ); + + server = serverBackup.get(hostSelector.getDefaultServiceName()); + } + } + if (server != null) { + serverBackup.put(serviceName, server); + } + + return server; + } + + public String getHost(Query query) + { + Server server = findServer(query); + + if (server == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + return null; + } + + log.info("Selected [%s]", server.getHost()); + + return server.getHost(); + } +} diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java deleted file mode 100644 index 92bf43135fe74fb8577baf633cb649066662d7ec..0000000000000000000000000000000000000000 --- a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.server.router; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import io.druid.curator.discovery.ServerDiscoveryFactory; -import io.druid.guice.annotations.Global; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; -import io.druid.query.SegmentDescriptor; -import org.joda.time.Interval; - -/** - */ -public class RouterQuerySegmentWalker implements QuerySegmentWalker -{ - private final QueryToolChestWarehouse warehouse; - private final ObjectMapper objectMapper; - private final HttpClient httpClient; - private final BrokerSelector brokerSelector; - private final TierConfig tierConfig; - - @Inject - public RouterQuerySegmentWalker( - QueryToolChestWarehouse warehouse, - ObjectMapper objectMapper, - @Global HttpClient httpClient, - BrokerSelector brokerSelector, - TierConfig tierConfig - ) - { - this.warehouse = warehouse; - this.objectMapper = objectMapper; - this.httpClient = httpClient; - this.brokerSelector = brokerSelector; - this.tierConfig = tierConfig; - } - - @Override - public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) - { - return makeRunner(); - } - - @Override - public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) - { - return makeRunner(); - } - - private QueryRunner makeRunner() - { - return new TierAwareQueryRunner( - warehouse, - objectMapper, - httpClient, - brokerSelector, - tierConfig - ); - } -} diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java deleted file mode 100644 index 3a098e0faed195fe4e1ff578b18454e7cde4a514..0000000000000000000000000000000000000000 --- a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.server.router; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.metamx.common.Pair; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import com.metamx.emitter.EmittingLogger; -import com.metamx.http.client.HttpClient; -import io.druid.client.DirectDruidClient; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoveryFactory; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryToolChestWarehouse; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - */ -public class TierAwareQueryRunner implements QueryRunner -{ - private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class); - - private final QueryToolChestWarehouse warehouse; - private final ObjectMapper objectMapper; - private final HttpClient httpClient; - private final BrokerSelector brokerSelector; - private final TierConfig tierConfig; - - private final ConcurrentHashMap serverBackup = new ConcurrentHashMap(); - - public TierAwareQueryRunner( - QueryToolChestWarehouse warehouse, - ObjectMapper objectMapper, - HttpClient httpClient, - BrokerSelector brokerSelector, - TierConfig tierConfig - ) - { - this.warehouse = warehouse; - this.objectMapper = objectMapper; - this.httpClient = httpClient; - this.brokerSelector = brokerSelector; - this.tierConfig = tierConfig; - } - - public Server findServer(Query query) - { - final Pair selected = brokerSelector.select(query); - final String brokerServiceName = selected.lhs; - final ServerDiscoverySelector selector = selected.rhs; - - Server server = selector.pick(); - if (server == null) { - log.error( - "WTF?! No server found for brokerServiceName[%s]. Using backup", - brokerServiceName - ); - - server = serverBackup.get(brokerServiceName); - - if (server == null) { - log.makeAlert( - "WTF?! No backup found for brokerServiceName[%s]. Using default[%s]", - brokerServiceName, - tierConfig.getDefaultBrokerServiceName() - ).emit(); - - server = serverBackup.get(tierConfig.getDefaultBrokerServiceName()); - } - } else { - serverBackup.put(brokerServiceName, server); - } - - return server; - } - - @Override - public Sequence run(Query query) - { - Server server = findServer(query); - - if (server == null) { - log.makeAlert( - "Catastrophic failure! No servers found at all! Failing request!" - ).emit(); - return Sequences.empty(); - } - - QueryRunner client = new DirectDruidClient( - warehouse, - objectMapper, - httpClient, - server.getHost() - ); - - return client.run(query); - } -} diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java similarity index 98% rename from server/src/main/java/io/druid/server/router/TierConfig.java rename to server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index c819edfce23dfc2be98fd9981caf9012a557db1b..67ff109f2176bb9f0247948bf772a4c4bb2d253e 100644 --- a/server/src/main/java/io/druid/server/router/TierConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -29,7 +29,7 @@ import java.util.LinkedHashMap; /** */ -public class TierConfig +public class TieredBrokerConfig { @JsonProperty @NotNull diff --git a/server/src/main/java/io/druid/server/router/BrokerSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java similarity index 67% rename from server/src/main/java/io/druid/server/router/BrokerSelector.java rename to server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index e27acf09c10d362b9f792b87d621e3572fd33ddf..cc625cdc5a3b858b6e5ac68004afb467be67ed9f 100644 --- a/server/src/main/java/io/druid/server/router/BrokerSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -20,20 +20,20 @@ package io.druid.server.router; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import io.druid.concurrent.Execs; +import io.druid.client.selector.HostSelector; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.query.Query; +import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.joda.time.Interval; import java.util.List; @@ -42,23 +42,23 @@ import java.util.concurrent.ConcurrentHashMap; /** */ -public class BrokerSelector +public class TieredBrokerHostSelector implements HostSelector { - private static EmittingLogger log = new EmittingLogger(BrokerSelector.class); + private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class); private final CoordinatorRuleManager ruleManager; - private final TierConfig tierConfig; + private final TieredBrokerConfig tierConfig; private final ServerDiscoveryFactory serverDiscoveryFactory; - private final ConcurrentHashMap selectorMap = new ConcurrentHashMap(); + private final ConcurrentHashMap selectorMap = new ConcurrentHashMap<>(); private final Object lock = new Object(); private volatile boolean started = false; @Inject - public BrokerSelector( + public TieredBrokerHostSelector( CoordinatorRuleManager ruleManager, - TierConfig tierConfig, + TieredBrokerConfig tierConfig, ServerDiscoveryFactory serverDiscoveryFactory ) { @@ -112,6 +112,12 @@ public class BrokerSelector } } + @Override + public String getDefaultServiceName() + { + return tierConfig.getDefaultBrokerServiceName(); + } + public Pair select(final Query query) { synchronized (lock) { @@ -120,35 +126,46 @@ public class BrokerSelector } } - List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + String brokerServiceName = null; - // find the rule that can apply to the entire set of intervals - DateTime now = new DateTime(); - int lastRulePosition = -1; - LoadRule baseRule = null; + // Somewhat janky way of always selecting highest priority broker for this type of query + if (query instanceof TimeBoundaryQuery) { + brokerServiceName = Iterables.getFirst( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ); + } - for (Interval interval : query.getIntervals()) { - int currRulePosition = 0; - for (Rule rule : rules) { - if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) { - lastRulePosition = currRulePosition; - baseRule = (LoadRule) rule; - break; + if (brokerServiceName == null) { + List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + + // find the rule that can apply to the entire set of intervals + DateTime now = new DateTime(); + int lastRulePosition = -1; + LoadRule baseRule = null; + + for (Interval interval : query.getIntervals()) { + int currRulePosition = 0; + for (Rule rule : rules) { + if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) { + lastRulePosition = currRulePosition; + baseRule = (LoadRule) rule; + break; + } + currRulePosition++; } - currRulePosition++; } - } - if (baseRule == null) { - return null; - } + if (baseRule == null) { + return null; + } - // in the baseRule, find the broker of highest priority - String brokerServiceName = null; - for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { - if (baseRule.getTieredReplicants().containsKey(entry.getKey())) { - brokerServiceName = entry.getValue(); - break; + // in the baseRule, find the broker of highest priority + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + if (baseRule.getTieredReplicants().containsKey(entry.getKey())) { + brokerServiceName = entry.getValue(); + break; + } } } diff --git a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java similarity index 90% rename from server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java rename to server/src/test/java/io/druid/server/router/QueryHostFinderTest.java index d788daa2f2b7ab50ef4ae9d88e5bbf9599df99a7..e8bf260d5ac4af1b53f6bc806094b2675edec971 100644 --- a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java +++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java @@ -40,20 +40,20 @@ import java.util.LinkedHashMap; /** */ -public class TierAwareQueryRunnerTest +public class QueryHostFinderTest { private ServerDiscoverySelector selector; - private BrokerSelector brokerSelector; - private TierConfig config; + private TieredBrokerHostSelector brokerSelector; + private TieredBrokerConfig config; private Server server; @Before public void setUp() throws Exception { selector = EasyMock.createMock(ServerDiscoverySelector.class); - brokerSelector = EasyMock.createMock(BrokerSelector.class); + brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class); - config = new TierConfig() + config = new TieredBrokerConfig() { @Override public LinkedHashMap getTierToBrokerMap() @@ -118,12 +118,8 @@ public class TierAwareQueryRunnerTest EasyMock.expect(selector.pick()).andReturn(server).once(); EasyMock.replay(selector); - TierAwareQueryRunner queryRunner = new TierAwareQueryRunner( - null, - null, - null, - brokerSelector, - config + QueryHostFinder queryRunner = new QueryHostFinder( + brokerSelector ); Server server = queryRunner.findServer( diff --git a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java similarity index 84% rename from server/src/test/java/io/druid/server/router/BrokerSelectorTest.java rename to server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 55e5dda44ea13e434a2f381d3877ef59ec98850b..b2a22af38ec793b96eceae0df6b220a4423cfc2c 100644 --- a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -50,11 +50,11 @@ import java.util.List; /** */ -public class BrokerSelectorTest +public class TieredBrokerHostSelectorTest { private ServerDiscoveryFactory factory; private ServerDiscoverySelector selector; - private BrokerSelector brokerSelector; + private TieredBrokerHostSelector brokerSelector; @Before public void setUp() throws Exception @@ -62,9 +62,9 @@ public class BrokerSelectorTest factory = EasyMock.createMock(ServerDiscoveryFactory.class); selector = EasyMock.createMock(ServerDiscoverySelector.class); - brokerSelector = new BrokerSelector( + brokerSelector = new TieredBrokerHostSelector( new TestRuleManager(null, null, null, null), - new TierConfig() + new TieredBrokerConfig() { @Override public LinkedHashMap getTierToBrokerMap() @@ -112,11 +112,12 @@ public class BrokerSelectorTest public void testBasicSelect() throws Exception { String brokerName = (String) brokerSelector.select( - new TimeBoundaryQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2011-08-31/2011-09-01"))), - null - ) + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity("all") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) + .intervals(Arrays.asList(new Interval("2011-08-31/2011-09-01"))) + .build() ).lhs; Assert.assertEquals("coldBroker", brokerName); @@ -127,11 +128,12 @@ public class BrokerSelectorTest public void testBasicSelect2() throws Exception { String brokerName = (String) brokerSelector.select( - new TimeBoundaryQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2013-08-31/2013-09-01"))), - null - ) + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity("all") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) + .intervals(Arrays.asList(new Interval("2013-08-31/2013-09-01"))) + .build() ).lhs; Assert.assertEquals("hotBroker", brokerName); @@ -141,11 +143,12 @@ public class BrokerSelectorTest public void testSelectMatchesNothing() throws Exception { Pair retVal = brokerSelector.select( - new TimeBoundaryQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2010-08-31/2010-09-01"))), - null - ) + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity("all") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) + .intervals(Arrays.asList(new Interval("2010-08-31/2010-09-01"))) + .build() ); Assert.assertEquals(null, retVal); @@ -199,7 +202,7 @@ public class BrokerSelectorTest public TestRuleManager( @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, - Supplier config, + Supplier config, ServerDiscoverySelector selector ) { diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 740edd8b55ba16d1a9a3ba154795d08e91798785..1d0d10073bf5881ed3a20fccda4b7125da5bcbae 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -25,24 +25,20 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.client.RoutingDruidClient; import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; -import io.druid.query.MapQueryToolChestWarehouse; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; -import io.druid.server.QueryResource; import io.druid.server.initialization.JettyServerInitializer; -import io.druid.server.router.BrokerSelector; import io.druid.server.router.CoordinatorRuleManager; -import io.druid.server.router.RouterQuerySegmentWalker; -import io.druid.server.router.TierConfig; +import io.druid.server.router.QueryHostFinder; +import io.druid.server.router.TieredBrokerConfig; +import io.druid.server.router.TieredBrokerHostSelector; import org.eclipse.jetty.server.Server; import java.util.List; @@ -71,19 +67,16 @@ public class CliRouter extends ServerRunnable @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.router", TierConfig.class); + JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class); binder.bind(CoordinatorRuleManager.class); LifecycleModule.register(binder, CoordinatorRuleManager.class); - binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class); + binder.bind(QueryHostFinder.class).in(LazySingleton.class); + binder.bind(RoutingDruidClient.class).in(LazySingleton.class); - binder.bind(BrokerSelector.class).in(ManageLifecycle.class); - binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class); - - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); - LifecycleModule.register(binder, QueryResource.class); + binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class); LifecycleModule.register(binder, Server.class); DiscoveryModule.register(binder, Self.class); @@ -92,7 +85,7 @@ public class CliRouter extends ServerRunnable @Provides @ManageLifecycle public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( - TierConfig config, + TieredBrokerConfig config, ServerDiscoveryFactory factory ) diff --git a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java index f3be30e00ff66cce4dadefac13ea4fd5a8f47a97..34039e2cb1ca5eea803ad1d74c9535180ed93ef4 100644 --- a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java @@ -38,16 +38,13 @@ public class QueryJettyServerInitializer implements JettyServerInitializer @Override public void initialize(Server server, Injector injector) { - final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS); - queries.setResourceBase("/"); - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addFilter(GzipFilter.class, "/*", null); root.addFilter(GuiceFilter.class, "/*", null); final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()}); + handlerList.setHandlers(new Handler[]{root, new DefaultHandler()}); server.setHandler(handlerList); } } diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..8ed1849af5cc01eaca4c8fb241095ac5052c1aea --- /dev/null +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -0,0 +1,104 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.RoutingDruidClient; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.server.AsyncQueryForwardingServlet; +import io.druid.server.QueryIDProvider; +import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.log.RequestLogger; +import io.druid.server.router.QueryHostFinder; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; + +/** + */ +public class RouterJettyServerInitializer implements JettyServerInitializer +{ + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; + private final QueryHostFinder hostFinder; + private final RoutingDruidClient routingDruidClient; + private final ServiceEmitter emitter; + private final RequestLogger requestLogger; + private final QueryIDProvider idProvider; + + @Inject + public RouterJettyServerInitializer( + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QueryHostFinder hostFinder, + RoutingDruidClient routingDruidClient, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryIDProvider idProvider + ) + { + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + this.hostFinder = hostFinder; + this.routingDruidClient = routingDruidClient; + this.emitter = emitter; + this.requestLogger = requestLogger; + this.idProvider = idProvider; + } + + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS); + queries.addServlet( + new ServletHolder( + new AsyncQueryForwardingServlet( + jsonMapper, + smileMapper, + hostFinder, + routingDruidClient, + emitter, + requestLogger, + idProvider + ) + ), "/druid/v2/*" + ); + queries.addFilter(GzipFilter.class, "/druid/v2/*", null); + + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()}); + server.setHandler(handlerList); + } +}