提交 f52d11af 编写于 作者: P philippgrulich 提交者: Stephan Ewen

[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config

This closes #2281
上级 923c6a7e
......@@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
private String queueName;
private RMQConnectionConfig rmqConnectionConfig;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema<IN> schema;
protected final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema<IN> schema;
private boolean logFailuresOnly = false;
/**
......@@ -57,6 +57,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
this.schema = schema;
}
/**
* Sets up the queue. The default implementation just declares the queue. The user may override
* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
* defining custom queue parameters)
*/
protected void setupQueue() throws IOException {
channel.queueDeclare(queueName, false, false, false, null);
}
/**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
......@@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
channel.queueDeclare(queueName, false, false, false, null);
setupQueue();
} catch (IOException e) {
throw new RuntimeException("Error while creating the channel", e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册