提交 5d96f6dc 编写于 作者: F fjy

Merge branch 'master' of github.com:metamx/druid

{
"queryType": "groupBy",
"dataSource": "rabbitmqtest",
"granularity": "all",
"dimensions": [],
"aggregations": [
{ "type": "count", "name": "rows" },
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
],
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
}
[{
"schema" : {
"dataSource":"rabbitmqtest",
"aggregators":[
{"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}
],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" }
},
"config" : {
"maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT1m"
},
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"username": "test-dude",
"password": "word-dude",
"virtualHost": "test-vhost"
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "messageTime" }
}
}]
package druid.examples.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.cli.*;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
*/
public class RabbitMQProducerMain
{
public static void main(String[] args)
throws Exception
{
// We use a List to keep track of option insertion order. See below.
final List<Option> optionList = new ArrayList<Option>();
optionList.add(OptionBuilder.withLongOpt("help")
.withDescription("display this help message")
.create("h"));
optionList.add(OptionBuilder.withLongOpt("hostname")
.hasArg()
.withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
.create("b"));
optionList.add(OptionBuilder.withLongOpt("port")
.hasArg()
.withDescription("the port of the AMQP broker [defaults to AMQP library default]")
.create("n"));
optionList.add(OptionBuilder.withLongOpt("username")
.hasArg()
.withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
.create("u"));
optionList.add(OptionBuilder.withLongOpt("password")
.hasArg()
.withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
.create("p"));
optionList.add(OptionBuilder.withLongOpt("vhost")
.hasArg()
.withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
.create("v"));
optionList.add(OptionBuilder.withLongOpt("exchange")
.isRequired()
.hasArg()
.withDescription("name of the AMQP exchange [required - no default]")
.create("e"));
optionList.add(OptionBuilder.withLongOpt("key")
.hasArg()
.withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
.create("k"));
optionList.add(OptionBuilder.withLongOpt("type")
.hasArg()
.withDescription("the type of exchange to create [default: 'topic']")
.create("t"));
optionList.add(OptionBuilder.withLongOpt("durable")
.withDescription("if set, a durable exchange will be declared [default: not set]")
.create("d"));
optionList.add(OptionBuilder.withLongOpt("autodelete")
.withDescription("if set, an auto-delete exchange will be declared [default: not set]")
.create("a"));
optionList.add(OptionBuilder.withLongOpt("single")
.withDescription("if set, only a single message will be sent [default: not set]")
.create("s"));
optionList.add(OptionBuilder.withLongOpt("start")
.hasArg()
.withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
.create());
optionList.add(OptionBuilder.withLongOpt("stop")
.hasArg()
.withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
.create());
optionList.add(OptionBuilder.withLongOpt("interval")
.hasArg()
.withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
.create());
optionList.add(OptionBuilder.withLongOpt("delay")
.hasArg()
.withDescription("the delay between sending messages in milliseconds [default: 100]")
.create());
// An extremely silly hack to maintain the above order in the help formatting.
HelpFormatter formatter = new HelpFormatter();
// Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
formatter.setOptionComparator(new Comparator(){
@Override
public int compare(Object o1, Object o2)
{
// I know this isn't fast, but who cares! The list is short.
return optionList.indexOf(o1) - optionList.indexOf(o2);
}
});
// Now we can add all the options to an Options instance. This is dumb!
Options options = new Options();
for (Option option : optionList) {
options.addOption(option);
}
CommandLine cmd = null;
try{
cmd = new BasicParser().parse(options, args);
}
catch(ParseException e){
formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
System.exit(1);
}
if(cmd.hasOption("h")) {
formatter.printHelp("RabbitMQProducerMain", options);
System.exit(2);
}
ConnectionFactory factory = new ConnectionFactory();
if(cmd.hasOption("b")){
factory.setHost(cmd.getOptionValue("b"));
}
if(cmd.hasOption("u")){
factory.setUsername(cmd.getOptionValue("u"));
}
if(cmd.hasOption("p")){
factory.setPassword(cmd.getOptionValue("p"));
}
if(cmd.hasOption("v")){
factory.setVirtualHost(cmd.getOptionValue("v"));
}
if(cmd.hasOption("n")){
factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
}
String exchange = cmd.getOptionValue("e");
String routingKey = "default.routing.key";
if(cmd.hasOption("k")){
routingKey = cmd.getOptionValue("k");
}
boolean durable = cmd.hasOption("d");
boolean autoDelete = cmd.hasOption("a");
String type = cmd.getOptionValue("t", "topic");
boolean single = cmd.hasOption("single");
int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));
Random r = new Random();
Calendar timer = Calendar.getInstance();
timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));
String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
do{
int wp = (10 + r.nextInt(90)) * 100;
String gender = r.nextBoolean() ? "male" : "female";
int age = 20 + r.nextInt(70);
String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);
channel.basicPublish(exchange, routingKey, null, line.getBytes());
System.out.println("Sent message: " + line);
timer.add(Calendar.SECOND, interval);
Thread.sleep(delay);
}while((!single && stop.after(timer.getTime())));
connection.close();
}
}
\ No newline at end of file
......@@ -120,6 +120,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>
......
......@@ -27,6 +27,7 @@ import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
})
......
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
/**
* A Jacksonified version of the RabbitMQ ConnectionFactory for better integration
* into the realtime.spec configuration file format.
*/
public class JacksonifiedConnectionFactory extends ConnectionFactory
{
@Override
@JsonProperty
public String getHost()
{
return super.getHost();
}
@Override
public void setHost(String host)
{
super.setHost(host);
}
@Override
@JsonProperty
public int getPort()
{
return super.getPort();
}
@Override
public void setPort(int port)
{
super.setPort(port);
}
@Override
@JsonProperty
public String getUsername()
{
return super.getUsername();
}
@Override
public void setUsername(String username)
{
super.setUsername(username);
}
@Override
@JsonProperty
public String getPassword()
{
return super.getPassword();
}
@Override
public void setPassword(String password)
{
super.setPassword(password);
}
@Override
@JsonProperty
public String getVirtualHost()
{
return super.getVirtualHost();
}
@Override
public void setVirtualHost(String virtualHost)
{
super.setVirtualHost(virtualHost);
}
@Override
@JsonProperty
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
{
super.setUri(uriString);
}
@Override
@JsonProperty
public int getRequestedChannelMax()
{
return super.getRequestedChannelMax();
}
@Override
public void setRequestedChannelMax(int requestedChannelMax)
{
super.setRequestedChannelMax(requestedChannelMax);
}
@Override
@JsonProperty
public int getRequestedFrameMax()
{
return super.getRequestedFrameMax();
}
@Override
public void setRequestedFrameMax(int requestedFrameMax)
{
super.setRequestedFrameMax(requestedFrameMax);
}
@Override
@JsonProperty
public int getRequestedHeartbeat()
{
return super.getRequestedHeartbeat();
}
@Override
public void setConnectionTimeout(int connectionTimeout)
{
super.setConnectionTimeout(connectionTimeout);
}
@Override
@JsonProperty
public int getConnectionTimeout()
{
return super.getConnectionTimeout();
}
@Override
public void setRequestedHeartbeat(int requestedHeartbeat)
{
super.setRequestedHeartbeat(requestedHeartbeat);
}
@Override
@JsonProperty
public Map<String, Object> getClientProperties()
{
return super.getClientProperties();
}
@Override
public void setClientProperties(Map<String, Object> clientProperties)
{
super.setClientProperties(clientProperties);
}
}
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* A configuration object for a RabbitMQ connection.
*/
public class RabbitMQFirehoseConfig
{
private String queue = null;
private String exchange = null;
private String routingKey = null;
private boolean durable = false;
private boolean exclusive = false;
private boolean autoDelete = false;
@JsonProperty
public String getQueue()
{
return queue;
}
public void setQueue(String queue)
{
this.queue = queue;
}
@JsonProperty
public String getExchange()
{
return exchange;
}
public void setExchange(String exchange)
{
this.exchange = exchange;
}
@JsonProperty
public String getRoutingKey()
{
return routingKey;
}
public void setRoutingKey(String routingKey)
{
this.routingKey = routingKey;
}
@JsonProperty
public boolean isDurable()
{
return durable;
}
public void setDurable(boolean durable)
{
this.durable = durable;
}
@JsonProperty
public boolean isExclusive()
{
return exclusive;
}
public void setExclusive(boolean exclusive)
{
this.exclusive = exclusive;
}
@JsonProperty
public boolean isAutoDelete()
{
return autoDelete;
}
public void setAutoDelete(boolean autoDelete)
{
this.autoDelete = autoDelete;
}
}
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
/**
* A FirehoseFactory for RabbitMQ.
* <p/>
* It will receive it's configuration through the realtime.spec file and expects to find a
* consumerProps element in the firehose definition with values for a number of configuration options.
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
* that have defaults can be skipped but options with no defaults must be specified with the exception
* of the URI property. If the URI property is set, it will override any other property that was also
* set.
* <p/>
* File: <em>realtime.spec</em>
* <pre>
* "firehose" : {
* "type" : "rabbitmq",
* "connection" : {
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
* },
* "config" : {
* "exchange": "test-exchange", # The exchange to connect to. No default
* "queue" : "druidtest", # The queue to connect to or create. No default
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
* "durable": "true", # Whether the queue should be durable. Default: 'false'
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
* "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false'
* },
* "parser" : {
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
* "data" : { "format" : "json" },
* "dimensionExclusions" : ["wp"]
* }
* },
* </pre>
* <p/>
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
* connection to it is lost. Furthermore it does not support any automatic failover on high availability
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
* could be "faked" to some extent we haven't implemented that yet. However, if a policy is defined in
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
* connection to another node is set up.
* <p/>
* For more information on RabbitMQ high availability please see:
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
*/
public class RabbitMQFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@JsonProperty
private final RabbitMQFirehoseConfig config;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final ConnectionFactory connectionFactory;
@JsonCreator
public RabbitMQFirehoseFactory(
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
@JsonProperty("config") RabbitMQFirehoseConfig config,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.connectionFactory = connectionFactory;
this.config = config;
this.parser = parser;
}
@Override
public Firehose connect() throws IOException
{
String queue = config.getQueue();
String exchange = config.getExchange();
String routingKey = config.getRoutingKey();
boolean durable = config.isDurable();
boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete();
final Connection connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener()
{
@Override
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Connection closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
final Channel channel = connection.createChannel();
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
channel.queueBind(queue, exchange, routingKey);
channel.addShutdownListener(new ShutdownListener()
{
@Override
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Channel closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
// We create a QueueingConsumer that will not auto-acknowledge messages since that
// happens on commit().
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, false, consumer);
return new Firehose()
{
/**
* Storing the latest delivery as a member variable should be safe since this will only be run
* by a single thread.
*/
private QueueingConsumer.Delivery delivery;
/**
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
* and including this tag. See commit() for more detail.
*/
private long lastDeliveryTag;
@Override
public boolean hasMore()
{
delivery = null;
try {
// Wait for the next delivery. This will block until something is available.
delivery = consumer.nextDelivery();
if (delivery != null) {
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
// If delivery is non-null, we report that there is something more to process.
return true;
}
}
catch (InterruptedException e) {
// A little unclear on how we should handle this.
// At any rate, we're in an unknown state now so let's log something and return false.
log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen.");
}
// This means that delivery is null or we caught the exception above so we report that we have
// nothing more to process.
return false;
}
@Override
public InputRow nextRow()
{
if (delivery == null) {
//Just making sure.
log.wtf("I have nothing in delivery. Method hasMore() should have returned false.");
return null;
}
return parser.parse(new String(delivery.getBody()));
}
@Override
public Runnable commit()
{
// This method will be called from the same thread that calls the other methods of
// this Firehose. However, the returned Runnable will be called by a different thread.
//
// It should be (thread) safe to copy the lastDeliveryTag like we do below and then
// acknowledge values up to and including that value.
return new Runnable()
{
// Store (copy) the last delivery tag to "become" thread safe.
final long deliveryTag = lastDeliveryTag;
@Override
public void run()
{
try {
log.info("Acknowledging delivery of messages up to tag: " + deliveryTag);
// Acknowledge all messages up to and including the stored delivery tag.
channel.basicAck(deliveryTag, true);
}
catch (IOException e) {
log.error(e, "Unable to acknowledge message reception to message queue.");
}
}
};
}
@Override
public void close() throws IOException
{
log.info("Closing connection to RabbitMQ");
channel.close();
connection.close();
}
};
}
}
......@@ -3,4 +3,4 @@
#
# Script to upload tarball of assembly build to static.druid.io for serving
#
s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/releases
s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/releases/
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册