提交 7aef4634 编写于 作者: F fjy

Merge pull request #647 from metamx/better-async

completely async request proxying + jetty update
......@@ -324,17 +324,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.2.1.v20140609</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.2.1.v20140609</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>9.2.1.v20140609</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
......
......@@ -38,6 +38,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -72,7 +73,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
}
public ListenableFuture<FinalType> postQuery(
String url,
URI uri,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
......@@ -80,9 +81,9 @@ public class RoutingDruidClient<IntermediateType, FinalType>
final ListenableFuture<FinalType> future;
try {
log.debug("Querying url[%s]", url);
log.debug("Querying url[%s]", uri);
future = httpClient
.post(new URL(url))
.post(uri.toURL())
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
.go(responseHandler);
......@@ -115,13 +116,13 @@ public class RoutingDruidClient<IntermediateType, FinalType>
}
public ListenableFuture<FinalType> get(
String url,
URI uri,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.get(new URL(url))
.get(uri.toURL())
.go(responseHandler);
}
catch (IOException e) {
......@@ -130,13 +131,13 @@ public class RoutingDruidClient<IntermediateType, FinalType>
}
public ListenableFuture<FinalType> delete(
String url,
URI uri,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.delete(new URL(url))
.delete(uri.toURL())
.go(responseHandler);
}
catch (IOException e) {
......
......@@ -27,6 +27,9 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
......@@ -37,7 +40,6 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
......@@ -49,12 +51,13 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
......@@ -67,7 +70,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Joiner COMMA_JOIN = Joiner.on(",");
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
......@@ -76,7 +78,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private final RequestLogger requestLogger;
public AsyncQueryForwardingServlet(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
......@@ -85,7 +86,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
RequestLogger requestLogger
)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
......@@ -98,216 +98,244 @@ public class AsyncQueryForwardingServlet extends HttpServlet
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
final AsyncContext asyncContext = req.startAsync(req, res);
// default async timeout to be same as maxIdleTime for now
asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis());
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final String host = hostFinder.getDefaultHost();
routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
}
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0);
final HttpResponseHandler<ServletOutputStream, ServletOutputStream> responseHandler =
new PassthroughHttpResponseHandler(res);
final URI uri = rewriteURI(hostFinder.getDefaultHost(), req);
asyncComplete(
res,
asyncContext,
jsonMapper,
routingDruidClient.get(uri, responseHandler)
);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0);
final String host = hostFinder.getDefaultHost();
routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
}
final HttpResponseHandler<ServletOutputStream, ServletOutputStream> responseHandler =
new PassthroughHttpResponseHandler(res);
final String host = hostFinder.getDefaultHost();
asyncComplete(
res,
asyncContext,
jsonMapper,
routingDruidClient.delete(rewriteURI(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler)
);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
protected void doPost(final HttpServletRequest req, final HttpServletResponse res) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(req.getContentType());
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
try {
final Query inputQuery = objectMapper.readValue(req.getInputStream(), Query.class);
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", inputQuery);
}
Query inputQuery = null;
try {
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
if (inputQuery.getId() == null) {
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
}
final Query query = inputQuery;
final Query query;
if (inputQuery.getId() == null) {
query = inputQuery.withId(UUID.randomUUID().toString());
} else {
query = inputQuery;
}
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", inputQuery);
}
URI rewrittenURI = rewriteURI(hostFinder.getHost(query), req);
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(
asyncContext,
objectMapper
)
{
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.debug("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.valueOf(query.getContextPriority(0)))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(request.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
request.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time",
requestTime,
"success",
true
)
)
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final AsyncContext asyncContext = req.startAsync();
// let proxy http client timeout
asyncContext.setTimeout(0);
return super.done(clientResponse);
}
};
ListenableFuture future = routingDruidClient.postQuery(
rewrittenURI,
query,
new PassthroughHttpResponseHandler(res)
);
routingDruidClient.postQuery(
makeUrl(hostFinder.getHost(inputQuery), request),
inputQuery,
responseHandler
Futures.addCallback(
future,
new FutureCallback()
{
@Override
public void onSuccess(@Nullable Object o)
{
final long requestTime = System.currentTimeMillis() - start;
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.valueOf(query.getContextPriority(0)))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time",
requestTime,
"success",
true
)
)
)
);
}
catch (Exception e) {
log.error(e, "Unable to log query [%s]!", query);
}
}
catch (Exception e) {
handleException(objectMapper, asyncContext, e);
@Override
public void onFailure(Throwable throwable)
{
try {
final String errorMessage = throwable.getMessage();
requestLogger.log(
new RequestLogLine(
new DateTime(),
request.getRemoteAddr(),
inputQuery,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"success",
false,
"exception",
errorMessage == null ? "no message" : errorMessage)
)
)
);
}
catch (Exception logError) {
log.error(logError, "Unable to log query [%s]!", inputQuery);
catch (IOException logError) {
log.error(logError, "Unable to log query [%s]!", query);
}
log.makeAlert(e, "Exception handling request")
.addData("query", inputQuery)
.addData("peer", request.getRemoteAddr())
log.makeAlert(throwable, "Exception handling request [%s]", query.getId())
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.emit();
}
}
);
asyncComplete(
res,
asyncContext,
objectMapper,
future
);
} catch(IOException e) {
log.warn(e, "Exception parsing query");
final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
null,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", errorMessage))
)
);
res.setStatus(HttpServletResponse.SC_BAD_REQUEST);
objectMapper.writeValue(
res.getOutputStream(),
ImmutableMap.of("error", errorMessage)
);
} catch(Exception e) {
handleException(res, objectMapper, e);
}
}
private static void asyncComplete(
final HttpServletResponse res,
final AsyncContext asyncContext,
final ObjectMapper objectMapper,
ListenableFuture future
)
{
Futures.addCallback(
future,
new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object o)
{
asyncContext.complete();
}
@Override
public void onFailure(Throwable throwable)
{
log.error(throwable, "Error processing query response");
try {
handleException(res, objectMapper, throwable);
} catch(Exception err) {
log.error(err, "Unable to handle exception response");
}
asyncContext.complete();
}
}
);
}
private String makeUrl(final String host, final HttpServletRequest req)
private URI rewriteURI(final String host, final HttpServletRequest req)
{
final StringBuilder uri = new StringBuilder("http://");
uri.append(host);
uri.append(req.getRequestURI());
final String queryString = req.getQueryString();
final String requestURI = req.getRequestURI() == null ? "" : req.getRequestURI();
if (queryString == null) {
return String.format("http://%s%s", host, requestURI);
if (queryString != null) {
uri.append("?").append(queryString);
}
return String.format("http://%s%s?%s", host, requestURI, queryString);
return URI.create(uri.toString());
}
private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception)
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Throwable exception) throws IOException
{
try {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (!response.isCommitted()) {
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
response.resetBuffer();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
objectMapper.writeValue(
response.getOutputStream(),
ImmutableMap.of(
"error", errorMessage
)
);
}
response.flushBuffer();
}
catch (IOException e) {
Throwables.propagate(e);
}
finally {
asyncContext.complete();
if (!response.isCommitted()) {
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
response.resetBuffer();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
objectMapper.writeValue(
response.getOutputStream(),
ImmutableMap.of("error", errorMessage)
);
}
response.flushBuffer();
}
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<OutputStream, OutputStream>
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<ServletOutputStream, ServletOutputStream>
{
private final AsyncContext asyncContext;
private final ObjectMapper objectMapper;
private final OutputStream outputStream;
private final HttpServletResponse response;
public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException
public PassthroughHttpResponseHandler(HttpServletResponse response) throws IOException
{
this.asyncContext = asyncContext;
this.objectMapper = objectMapper;
this.outputStream = asyncContext.getResponse().getOutputStream();
this.response = response;
}
protected void copyStatusHeaders(HttpResponse clientResponse)
{
final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(clientResponse.getStatus().getCode());
response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE));
......@@ -337,29 +365,29 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse clientResponse)
public ClientResponse<ServletOutputStream> handleResponse(HttpResponse clientResponse)
{
copyStatusHeaders(clientResponse);
try {
final ServletOutputStream outputStream = response.getOutputStream();
ChannelBuffer buf = clientResponse.getContent();
buf.readBytes(outputStream, buf.readableBytes());
return ClientResponse.unfinished(outputStream);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
public ClientResponse<ServletOutputStream> handleChunk(
ClientResponse<ServletOutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
buf.readBytes(clientResponse.getObj(), buf.readableBytes());
}
catch (Exception e) {
throw Throwables.propagate(e);
......@@ -368,25 +396,18 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
public ClientResponse<ServletOutputStream> done(ClientResponse<ServletOutputStream> clientResponse)
{
asyncContext.complete();
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
ClientResponse<ServletOutputStream> clientResponse,
Throwable e
)
{
log.error(e, "Error processing query response");
// throwing an exception here may cause resource leak
try {
handleException(objectMapper, asyncContext, e);
} catch(Exception err) {
log.error(err, "Unable to handle exception response");
}
// exceptions are handled on future callback
}
}
}
......@@ -148,6 +148,16 @@ public class QueryResource
);
}
if ((boolean) query.getContextValue("b", false)) {
System.out.println("***NEW QUERY***");
while (true) {
System.out.println("SLEEPING");
Thread.sleep(10000);
}
} else if ((boolean) query.getContextValue("a", false)) {
return Response.ok("hi").build();
}
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
}
......
......@@ -30,6 +30,7 @@ import io.druid.server.QueryStats;
import io.druid.server.RequestLogLine;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Map;
public class EmittingRequestLogger implements RequestLogger
......@@ -44,7 +45,7 @@ public class EmittingRequestLogger implements RequestLogger
}
@Override
public void log(final RequestLogLine requestLogLine) throws Exception
public void log(final RequestLogLine requestLogLine) throws IOException
{
emitter.emit(new RequestLogEventBuilder(feed, requestLogLine));
}
......
......@@ -110,7 +110,7 @@ public class FileRequestLogger implements RequestLogger
}
@Override
public void log(RequestLogLine requestLogLine) throws Exception
public void log(RequestLogLine requestLogLine) throws IOException
{
synchronized (lock) {
fileWriter.write(
......
......@@ -21,12 +21,14 @@ package io.druid.server.log;
import io.druid.server.RequestLogLine;
import java.io.IOException;
/**
*/
public class NoopRequestLogger implements RequestLogger
{
@Override
public void log(RequestLogLine requestLogLine) throws Exception
public void log(RequestLogLine requestLogLine) throws IOException
{
// This is a no op!
}
......
......@@ -21,9 +21,11 @@ package io.druid.server.log;
import io.druid.server.RequestLogLine;
import java.io.IOException;
/**
*/
public interface RequestLogger
{
public void log(RequestLogLine requestLogLine) throws Exception;
public void log(RequestLogLine requestLogLine) throws IOException;
}
......@@ -80,7 +80,6 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
queries.addServlet(
new ServletHolder(
new AsyncQueryForwardingServlet(
config,
jsonMapper,
smileMapper,
hostFinder,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册