提交 cc40a339 编写于 作者: F fjy

Merge pull request #563 from tucksaun/fix-rabbitmq-connection-retries

Make RabbitMQ Firehose resilient to broker deconnections
...@@ -25,9 +25,13 @@ import com.metamx.common.logger.Logger; ...@@ -25,9 +25,13 @@ import com.metamx.common.logger.Logger;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ConsumerCancelledException;
import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
...@@ -40,6 +44,8 @@ import net.jodah.lyra.retry.RetryPolicy; ...@@ -40,6 +44,8 @@ import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration; import net.jodah.lyra.util.Duration;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* A FirehoseFactory for RabbitMQ. * A FirehoseFactory for RabbitMQ.
...@@ -179,7 +185,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa ...@@ -179,7 +185,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
* Storing the latest delivery as a member variable should be safe since this will only be run * Storing the latest delivery as a member variable should be safe since this will only be run
* by a single thread. * by a single thread.
*/ */
private QueueingConsumer.Delivery delivery; private Delivery delivery;
/** /**
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to * Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
...@@ -268,4 +274,41 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa ...@@ -268,4 +274,41 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
{ {
return parser; return parser;
} }
private static class QueueingConsumer extends DefaultConsumer
{
private final BlockingQueue<Delivery> _queue;
public QueueingConsumer(Channel ch) {
this(ch, new LinkedBlockingQueue<Delivery>());
}
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
super(ch);
this._queue = q;
}
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
_queue.clear();
}
@Override public void handleCancel(String consumerTag) throws IOException {
_queue.clear();
}
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
this._queue.add(new Delivery(envelope, properties, body));
}
public Delivery nextDelivery()
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{
return _queue.take();
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册