提交 24a02680 编写于 作者: F fjy

cleanup code in router and add forwarding for get requests

上级 6893282a
......@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Client;
import io.druid.query.Query;
import org.jboss.netty.handler.codec.http.HttpHeaders;
......@@ -52,7 +52,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
@Inject
public RoutingDruidClient(
ObjectMapper objectMapper,
@Global HttpClient httpClient
@Client HttpClient httpClient
)
{
this.objectMapper = objectMapper;
......@@ -67,7 +67,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return openConnections.get();
}
public ListenableFuture<FinalType> run(
public ListenableFuture<FinalType> post(
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
......@@ -109,4 +109,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return future;
}
public ListenableFuture<FinalType> get(
String url,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.get(new URL(url))
.go(responseHandler);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
......@@ -85,6 +85,119 @@ public class AsyncQueryForwardingServlet extends HttpServlet
this.requestLogger = requestLogger;
}
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException
{
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getDefaultHost();
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/json");
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
outputStream.write(bytes);
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
byte[] bytes = getContentBytes(chunk.getContent());
if (bytes.length > 0) {
try {
clientResponse.getObj().write(bytes);
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
};
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.get(makeUrl(host, req), responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
if (ctx != null) {
ctx.complete();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
}
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
......@@ -99,16 +212,16 @@ public class AsyncQueryForwardingServlet extends HttpServlet
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
AsyncContext ctx = null;
try {
final AsyncContext ctx = req.startAsync(req, resp);
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
req.setAttribute(DISPATCHED, true);
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
......@@ -142,6 +255,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
outputStream.write(bytes);
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
......@@ -159,6 +273,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
clientResponse.getObj().write(bytes);
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
......@@ -202,30 +317,26 @@ public class AsyncQueryForwardingServlet extends HttpServlet
throw Throwables.propagate(e);
}
finally {
ctx.dispatch();
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
};
ctx.start(
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler);
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
......@@ -242,6 +353,10 @@ public class AsyncQueryForwardingServlet extends HttpServlet
resp.flushBuffer();
if (ctx != null) {
ctx.complete();
}
try {
requestLogger.log(
new RequestLogLine(
......@@ -272,4 +387,11 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
return String.format("http://%s%s?%s", host, req.getRequestURI(), req.getQueryString());
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
}
......@@ -49,7 +49,49 @@ public class QueryHostFinder<T>
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
return findServerInner(selected);
}
public Server findDefaultServer()
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.getDefaultLookup();
return findServerInner(selected);
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
public String getDefaultHost()
{
Server server = findDefaultServer();
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
return server.getHost();
}
private Server findServerInner(final Pair<String, ServerDiscoverySelector> selected)
{
if (selected == null) {
log.error("Danger, Will Robinson! Unable to find any brokers!");
}
......@@ -82,21 +124,4 @@ public class QueryHostFinder<T>
return server;
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
}
......@@ -193,7 +193,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
return new Pair<>(brokerServiceName, retVal);
}
private Pair<String, ServerDiscoverySelector> getDefaultLookup()
public Pair<String, ServerDiscoverySelector> getDefaultLookup()
{
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);
......
......@@ -29,10 +29,12 @@ import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.HttpClientModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Self;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.router.CoordinatorRuleManager;
......@@ -62,6 +64,7 @@ public class CliRouter extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new HttpClientModule("druid.router.http", Client.class),
new Module()
{
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册