提交 8656ed5e 编写于 作者: R Robert Metzger

[FLINK-3455] Bump Kafka to 0.9.0.1 and 0.8.2.2

This closes #1684
上级 bfc14ebc
......@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.8.2.0</kafka.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.junit.Test;
public class KafkaShortRetention08Test extends KafkaShortRetentionTestBase {
public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
@Test(timeout=60000)
public void testAutoOffsetReset() throws Exception {
......
......@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.9.0.0</kafka.version>
<kafka.version>0.9.0.1</kafka.version>
</properties>
<dependencies>
......@@ -111,12 +111,22 @@ under the License.
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx900m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx900m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
</plugins>
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.junit.Test;
public class KafkaShortRetention09Test extends KafkaShortRetentionTestBase {
public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
@Test(timeout=60000)
public void testAutoOffsetReset() throws Exception {
......
......@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.8.2.0</kafka.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
......
......@@ -47,6 +47,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -80,6 +81,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.Rule;
......@@ -144,14 +146,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
properties.setProperty("bootstrap.servers", "localhost:80");
properties.setProperty("zookeeper.connect", "localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast
properties.setProperty("session.timeout.ms", "2000");
properties.setProperty("fetch.max.wait.ms", "2000");
properties.setProperty("heartbeat.interval.ms", "1000");
FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
DataStream<String> stream = see.addSource(source);
stream.print();
see.execute("No broker test");
} catch(RuntimeException re){
Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
re.getClass().equals(RuntimeException.class) &&
re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
} catch(RuntimeException re) {
if(kafkaServer.getVersion().equals("0.9")) {
Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
re.getClass().equals(TimeoutException.class) &&
re.getMessage().contains("Timeout expired while fetching topic metadata"));
} else {
Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
re.getClass().equals(RuntimeException.class) &&
re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
}
}
}
/**
......@@ -780,9 +792,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
......@@ -792,8 +801,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// add consuming topology:
Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
consumerProps.setProperty("queued.max.message.chunks", "1");
FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
......@@ -822,7 +831,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// add producing topology
Properties producerProps = new Properties();
producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 14));
producerProps.setProperty("retries", "3");
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
......@@ -840,10 +849,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
Random rnd = new Random();
long cnt = 0;
int fifteenMb = 1024 * 1024 * 15;
int sevenMb = 1024 * 1024 * 7;
while (running) {
byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)];
ctx.collect(new Tuple2<>(cnt++, wl));
Thread.sleep(100);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册