提交 bd0841e7 编写于 作者: Y yew1eb 提交者: Tzu-Li (Gordon) Tai

[FLINK-7654] [rabbitmq] Update RabbitMQ Java client version to 4.2.0

This closes #4694.
上级 4ba3eecd
......@@ -26,16 +26,14 @@ under the License.
# License of the RabbitMQ Connector
Flink's RabbitMQ connector defines a Maven dependency on the
"RabbitMQ AMQP Java Client", licensed under the
[Mozilla Public License v1.1 (MPL 1.1)](https://www.mozilla.org/en-US/MPL/1.1/).
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
nor packages binaries from the "RabbitMQ AMQP Java Client".
Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared
in the Mozilla Public License v1.1 (MPL 1.1).
must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
# RabbitMQ Connector
......
# License of the Rabbit MQ Connector
Flink's RabbitMQ connector defines a Maven dependency on the
"RabbitMQ AMQP Java Client", licensed under the
Mozilla Public License v1.1 (MPL 1.1).
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
nor packages binaries from the "RabbitMQ AMQP Java Client".
Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared
in the Mozilla Public License v1.1 (MPL 1.1).
must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
......@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
<rabbitmq.version>3.3.1</rabbitmq.version>
<rabbitmq.version>4.2.0</rabbitmq.version>
</properties>
<dependencies>
......
......@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* A Sink for publishing data into RabbitMQ.
......@@ -117,15 +118,19 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
@Override
public void close() {
IOException t = null;
Exception t = null;
try {
channel.close();
} catch (IOException e) {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
t = e;
}
try {
connection.close();
if (connection != null) {
connection.close();
}
} catch (IOException e) {
if (t != null) {
LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
......
......@@ -178,7 +178,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
public void close() throws Exception {
super.close();
try {
connection.close();
if (connection != null) {
connection.close();
}
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + queueName
+ " at " + rmqConnectionConfig.getHost(), e);
......
......@@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq.common;
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import com.rabbitmq.client.Channel;
......
......@@ -53,6 +53,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
......@@ -404,7 +405,7 @@ public class RMQSourceTest {
try {
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
fail("Test environment couldn't be created.");
}
return connectionFactory;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册