提交 3581a335 编写于 作者: P Piotr Nowojski 提交者: Stephan Ewen

[FLINK-7739] [kafka connector] Fix test instabilities

  - Set shorter heartbeats intervals. Default pause value of 60seconds is
    too large (tests would timeout before akka react)

  - Exclude netty dependency from zookeeper. Zookeeper was pulling in
    conflicting Netty version. Conflict was extremly subtle - TaskManager in
    Kafka tests was deadlocking in some rare corner cases.

This closes #4775
上级 a8443a52
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.MetricOptions;
...@@ -121,6 +122,8 @@ public abstract class KafkaTestBase extends TestLogger { ...@@ -121,6 +122,8 @@ public abstract class KafkaTestBase extends TestLogger {
protected static Configuration getFlinkConfiguration() { protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration(); Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
......
...@@ -460,6 +460,12 @@ under the License. ...@@ -460,6 +460,12 @@ under the License.
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusion>
<!-- Netty from zookeeper is conflicting with akka's, which can cause occasional deadlock inside
Netty threads -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册