diff --git a/processing/src/main/java/io/druid/query/DataSourceUtil.java b/processing/src/main/java/io/druid/query/DataSourceUtil.java index 109db95ee049938d8a184fffb2438c2b26aec548..7c056f1eabc581ed334f47aeca8d23424671609e 100644 --- a/processing/src/main/java/io/druid/query/DataSourceUtil.java +++ b/processing/src/main/java/io/druid/query/DataSourceUtil.java @@ -19,10 +19,14 @@ package io.druid.query; +import com.google.common.base.Joiner; + import java.util.List; public class DataSourceUtil { + public static final Joiner COMMA_JOIN = Joiner.on(","); + public static String getMetricName(DataSource dataSource) { final List names = dataSource.getNames(); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 5d0981bb1ac78916a647bd9eec465e8ab5be1288..b485fa2ce69e0aafa5e5d86a566a724411a6a257 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -22,16 +22,22 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.query.DataSourceUtil; import io.druid.query.Query; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.proxy.AsyncProxyServlet; import org.joda.time.DateTime; @@ -72,6 +78,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; private final HttpClient httpClient; + private final ServiceEmitter emitter; private final RequestLogger requestLogger; public AsyncQueryForwardingServlet( @@ -79,6 +86,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @Router HttpClient httpClient, + ServiceEmitter emitter, RequestLogger requestLogger ) { @@ -86,6 +94,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet this.smileMapper = smileMapper; this.hostFinder = hostFinder; this.httpClient = httpClient; + this.emitter = emitter; this.requestLogger = requestLogger; } @@ -97,31 +106,37 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet String host = hostFinder.getDefaultHost(); Query inputQuery = null; - try { - inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); - if (inputQuery != null) { - host = hostFinder.getHost(inputQuery); + boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null; + long startTime = System.currentTimeMillis(); + + // queries only exist for POST + if (request.getMethod().equalsIgnoreCase(HttpMethod.POST.asString())) { + try { + inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + if (inputQuery != null) { + host = hostFinder.getHost(inputQuery); + } + } + catch (IOException e) { + log.warn(e, "Exception parsing query"); + final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + null, + new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) + ) + ); + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of("error", errorMessage) + ); + } + catch (Exception e) { + handleException(response, objectMapper, e); } - } - catch (IOException e) { - log.warn(e, "Exception parsing query"); - final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); - requestLogger.log( - new RequestLogLine( - new DateTime(), - request.getRemoteAddr(), - null, - new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) - ) - ); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - objectMapper.writeValue( - response.getOutputStream(), - ImmutableMap.of("error", errorMessage) - ); - } - catch (Exception e) { - handleException(response, objectMapper, e); } final int requestId = getRequestId(request); @@ -148,7 +163,6 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet .version(HttpVersion.fromString(request.getProtocol())); // Copy headers - boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null; for (Enumeration headerNames = request.getHeaderNames(); headerNames.hasMoreElements(); ) { String headerName = headerNames.nextElement(); @@ -176,8 +190,12 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet getTimeout(), TimeUnit.MILLISECONDS ); - if (hasContent && inputQuery != null) { - proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery))); + if (hasContent) { + if (inputQuery != null) { + proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery))); + } else { + proxyRequest.content(proxyRequestContent(proxyRequest, request)); + } } customizeProxyRequest(proxyRequest, request); @@ -238,4 +256,102 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet } return URI.create(uri.toString()); } + + private class MetricsEmittingProxyResponseListener extends ProxyResponseListener + { + private final HttpServletRequest req; + private final HttpServletResponse res; + private final Query query; + private final long start; + + public MetricsEmittingProxyResponseListener( + HttpServletRequest request, + HttpServletResponse response, + Query query, + long start + ) + { + super(request, response); + + this.req = request; + this.res = response; + this.query = query; + this.start = start; + } + + @Override + public void onComplete(Result result) + { + final long requestTime = System.currentTimeMillis() - start; + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser3(String.valueOf(query.getContextPriority(0))) + .setUser4(query.getType()) + .setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", + requestTime, + "success", + true + ) + ) + ) + ); + } + catch (Exception e) { + log.error(e, "Unable to log query [%s]!", query); + } + + super.onComplete(result); + } + + @Override + public void onFailure(Response response, Throwable failure) + { + try { + final String errorMessage = failure.getMessage(); + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "success", + false, + "exception", + errorMessage == null ? "no message" : errorMessage + ) + ) + ) + ); + } + catch (IOException logError) { + log.error(logError, "Unable to log query [%s]!", query); + } + + log.makeAlert(failure, "Exception handling request") + .addData("exception", failure.toString()) + .addData("query", query) + .addData("peer", req.getRemoteAddr()) + .emit(); + + super.onFailure(response, failure); + } + } } diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index b3ec2702d5863426c871a1ab2bc217c3ac6d8903..72db25e758a6a8ac8a940081866e682173e780ce 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.server.AsyncQueryForwardingServlet; @@ -48,6 +49,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; private final HttpClient httpClient; + private final ServiceEmitter emitter; private final RequestLogger requestLogger; @Inject @@ -56,6 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @Router HttpClient httpClient, + ServiceEmitter emitter, RequestLogger requestLogger ) { @@ -63,6 +66,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer this.smileMapper = smileMapper; this.hostFinder = hostFinder; this.httpClient = httpClient; + this.emitter = emitter; this.requestLogger = requestLogger; } @@ -77,6 +81,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer smileMapper, hostFinder, httpClient, + emitter, requestLogger ) ), "/druid/v2/*"