From 8656ed5ea0b329e2d485600ce24aaf1a9e724db0 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Fri, 19 Feb 2016 20:59:58 +0100 Subject: [PATCH] [FLINK-3455] Bump Kafka to 0.9.0.1 and 0.8.2.2 This closes #1684 --- .../flink-connector-kafka-0.8/pom.xml | 2 +- ....java => KafkaShortRetention08ITCase.java} | 2 +- .../flink-connector-kafka-0.9/pom.xml | 12 ++++++- ....java => KafkaShortRetention09ITCase.java} | 2 +- .../flink-connector-kafka-base/pom.xml | 2 +- .../kafka/KafkaConsumerTestBase.java | 33 ++++++++++++------- 6 files changed, 36 insertions(+), 17 deletions(-) rename flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/{KafkaShortRetention08Test.java => KafkaShortRetention08ITCase.java} (93%) rename flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/{KafkaShortRetention09Test.java => KafkaShortRetention09ITCase.java} (93%) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml index 4aa01b12038..69b35c9a586 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.8.2.0 + 0.8.2.2 diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java similarity index 93% rename from flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08Test.java rename to flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java index b17006222bd..21140ddec18 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08Test.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.Test; -public class KafkaShortRetention08Test extends KafkaShortRetentionTestBase { +public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase { @Test(timeout=60000) public void testAutoOffsetReset() throws Exception { diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml index 248936f14b2..fce1536821a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.9.0.0 + 0.9.0.1 @@ -111,12 +111,22 @@ under the License. + + org.apache.maven.plugins + maven-surefire-plugin + + + 1 + -Xms256m -Xmx900m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit + + org.apache.maven.plugins maven-failsafe-plugin 1 + -Xms256m -Xmx900m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09Test.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java similarity index 93% rename from flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09Test.java rename to flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java index 93e42ac17c4..74b35af2f5c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09Test.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.Test; -public class KafkaShortRetention09Test extends KafkaShortRetentionTestBase { +public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase { @Test(timeout=60000) public void testAutoOffsetReset() throws Exception { diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml index 4b7ab070d20..7a3a400634c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.8.2.0 + 0.8.2.2 diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 800bfa7fffb..9377cee02fa 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -47,6 +47,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.taskmanager.RuntimeEnvironment; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -80,6 +81,7 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; import org.junit.Rule; @@ -144,14 +146,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { properties.setProperty("bootstrap.servers", "localhost:80"); properties.setProperty("zookeeper.connect", "localhost:80"); properties.setProperty("group.id", "test"); + properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast + properties.setProperty("session.timeout.ms", "2000"); + properties.setProperty("fetch.max.wait.ms", "2000"); + properties.setProperty("heartbeat.interval.ms", "1000"); FlinkKafkaConsumerBase source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties); DataStream stream = see.addSource(source); stream.print(); see.execute("No broker test"); - } catch(RuntimeException re){ - Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re), - re.getClass().equals(RuntimeException.class) && - re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]")); + } catch(RuntimeException re) { + if(kafkaServer.getVersion().equals("0.9")) { + Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re), + re.getClass().equals(TimeoutException.class) && + re.getMessage().contains("Timeout expired while fetching topic metadata")); + } else { + Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re), + re.getClass().equals(RuntimeException.class) && + re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]")); + } } } /** @@ -780,9 +792,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final TypeInformationSerializationSchema> serSchema = new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); - final TypeInformationSerializationSchema> deserSchema = - new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); @@ -792,8 +801,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // add consuming topology: Properties consumerProps = new Properties(); consumerProps.putAll(standardProps); - consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40)); - consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher + consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14)); + consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher consumerProps.setProperty("queued.max.message.chunks", "1"); FlinkKafkaConsumerBase> source = kafkaServer.getConsumer(topic, serSchema, consumerProps); @@ -822,7 +831,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // add producing topology Properties producerProps = new Properties(); - producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30)); + producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 14)); producerProps.setProperty("retries", "3"); producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings); @@ -840,10 +849,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { public void run(SourceContext> ctx) throws Exception { Random rnd = new Random(); long cnt = 0; - int fifteenMb = 1024 * 1024 * 15; + int sevenMb = 1024 * 1024 * 7; while (running) { - byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)]; + byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)]; ctx.collect(new Tuple2<>(cnt++, wl)); Thread.sleep(100); -- GitLab