diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 81169e1ccc7f9aa26c9b9fe35104430eab5d73da..dbec662e81ca0ea544c9480d1774cab939a11f80 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -434,7 +434,6 @@ under the License. **/runtime/concurrent/**, **/runtime/execution/**, **/runtime/executiongraph/**, - **/runtime/heartbeat/**, **/runtime/highavailability/**, **/runtime/instance/**, **/runtime/io/**, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java index 8c082517d330a3f32fd90c762d8f1f5a65772f45..c6307aad94dd55f3beee8becf1636ff6e4bd1b46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java @@ -24,9 +24,9 @@ import org.apache.flink.runtime.concurrent.Future; /** * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used * for the following things: - *

+ * *

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java index a648b1fb6710b92a11d4f7cdf91ba04a7a4891f6..928c826335b976f4a9bfef998418665b7c01146c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java @@ -21,13 +21,8 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; /** - * A heartbeat manager has to be able to do the following things: - * - * - * + * A heartbeat manager has to be able to start/stop monitoring a {@link HeartbeatTarget}, and report heartbeat timeouts + * for this target. * * @param Type of the incoming payload * @param Type of the outgoing payload diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index 28ab086647f4f430ce1e47177fed741f6ba3337c..d97cfa0882e9514e74c71068ab4ea2d8edeecb81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import javax.annotation.concurrent.ThreadSafe; + import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -47,27 +49,27 @@ import java.util.concurrent.atomic.AtomicReference; @ThreadSafe public class HeartbeatManagerImpl implements HeartbeatManager { - /** Heartbeat timeout interval in milli seconds */ + /** Heartbeat timeout interval in milli seconds. */ private final long heartbeatTimeoutIntervalMs; - /** Resource ID which is used to mark one own's heartbeat signals */ + /** Resource ID which is used to mark one own's heartbeat signals. */ private final ResourceID ownResourceID; - /** Heartbeat listener with which the heartbeat manager has been associated */ + /** Heartbeat listener with which the heartbeat manager has been associated. */ private final HeartbeatListener heartbeatListener; - /** Executor service used to run heartbeat timeout notifications */ + /** Executor service used to run heartbeat timeout notifications. */ private final ScheduledExecutor scheduledExecutor; protected final Logger log; - /** Map containing the heartbeat monitors associated with the respective resource ID */ + /** Map containing the heartbeat monitors associated with the respective resource ID. */ private final ConcurrentHashMap> heartbeatTargets; - /** Execution context used to run future callbacks */ + /** Execution context used to run future callbacks. */ private final Executor executor; - /** Running state of the heartbeat manager */ + /** Running state of the heartbeat manager. */ protected volatile boolean stopped; public HeartbeatManagerImpl( @@ -240,18 +242,18 @@ public class HeartbeatManagerImpl implements HeartbeatManager { */ static class HeartbeatMonitor implements Runnable { - /** Resource ID of the monitored heartbeat target */ + /** Resource ID of the monitored heartbeat target. */ private final ResourceID resourceID; - /** Associated heartbeat target */ + /** Associated heartbeat target. */ private final HeartbeatTarget heartbeatTarget; private final ScheduledExecutor scheduledExecutor; - /** Listener which is notified about heartbeat timeouts */ + /** Listener which is notified about heartbeat timeouts. */ private final HeartbeatListener heartbeatListener; - /** Maximum heartbeat timeout interval */ + /** Maximum heartbeat timeout interval. */ private final long heartbeatTimeoutIntervalMs; private volatile ScheduledFuture futureTimeout; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index 53837d1482ff76081dd78b70be5d3138b7faf115..5b3a95748f82e7ade6818c6b8f15e22bce3adb88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; + import org.slf4j.Logger; import java.util.concurrent.Executor; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java index 7d55b9c56301b645c6629317987ac73b07e610ff..22df75155c3015ca7eb9e7237bfb11a6a0c5391d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; /** @@ -31,10 +32,10 @@ import org.slf4j.Logger; */ public class HeartbeatServices { - /** Heartbeat interval for the created services */ + /** Heartbeat interval for the created services. */ protected final long heartbeatInterval; - /** Heartbeat timeout for the created services */ + /** Heartbeat timeout for the created services. */ protected final long heartbeatTimeout; public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 3da18aaea38d64a4662b0110e1b7dcc3f51bb4b9..031f48cc577e025a786e732755ed428859b2e264 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.TestLogger; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +47,14 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - +/** + * Tests for the {@link HeartbeatManager}. + */ public class HeartbeatManagerTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class); @@ -217,7 +220,7 @@ public class HeartbeatManagerTest extends TestLogger { heartbeatListener2, new DirectExecutorService(), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), - LOG);; + LOG); heartbeatManager.monitorTarget(resourceID2, heartbeatManager2); heartbeatManager2.monitorTarget(resourceID, heartbeatManager); @@ -239,7 +242,7 @@ public class HeartbeatManagerTest extends TestLogger { } /** - * Tests that after unmonitoring a target, there won't be a timeout triggered + * Tests that after unmonitoring a target, there won't be a timeout triggered. */ @Test public void testTargetUnmonitoring() throws InterruptedException, ExecutionException { @@ -265,7 +268,6 @@ public class HeartbeatManagerTest extends TestLogger { Future timeout = heartbeatListener.getTimeoutFuture(); - try { timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); fail("Timeout should time out."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java index e628db5ae63f0122a56aae057a9e204bbd4b2bc2..8bf663cc2005d3b79f248daeefb5b5566f7773c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java @@ -21,8 +21,12 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; +/** + * A {@link HeartbeatServices} that allows the injection of a {@link ScheduledExecutor}. + */ public class TestingHeartbeatServices extends HeartbeatServices { private final ScheduledExecutor scheduledExecutorToUse;