提交 73ab3175 编写于 作者: Z zentol

[FLINK-6825] Activate checkstyle for runtime/heartbeat

This closes #4067.
上级 1779a0eb
...@@ -434,7 +434,6 @@ under the License. ...@@ -434,7 +434,6 @@ under the License.
**/runtime/concurrent/**, **/runtime/concurrent/**,
**/runtime/execution/**, **/runtime/execution/**,
**/runtime/executiongraph/**, **/runtime/executiongraph/**,
**/runtime/heartbeat/**,
**/runtime/highavailability/**, **/runtime/highavailability/**,
**/runtime/instance/**, **/runtime/instance/**,
**/runtime/io/**, **/runtime/io/**,
......
...@@ -24,9 +24,9 @@ import org.apache.flink.runtime.concurrent.Future; ...@@ -24,9 +24,9 @@ import org.apache.flink.runtime.concurrent.Future;
/** /**
* Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used
* for the following things: * for the following things:
* <p> *
* <ul> * <ul>
* <il>Notifications about heartbeat timeouts</il> * <li>Notifications about heartbeat timeouts</li>
* <li>Payload reports of incoming heartbeats</li> * <li>Payload reports of incoming heartbeats</li>
* <li>Retrieval of payloads for outgoing heartbeats</li> * <li>Retrieval of payloads for outgoing heartbeats</li>
* </ul> * </ul>
......
...@@ -21,13 +21,8 @@ package org.apache.flink.runtime.heartbeat; ...@@ -21,13 +21,8 @@ package org.apache.flink.runtime.heartbeat;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
/** /**
* A heartbeat manager has to be able to do the following things: * A heartbeat manager has to be able to start/stop monitoring a {@link HeartbeatTarget}, and report heartbeat timeouts
* * for this target.
* <ul>
* <li>Monitor {@link HeartbeatTarget} and report heartbeat timeouts for this target</li>
* <li>Stop monitoring a {@link HeartbeatTarget}</li>
* </ul>
*
* *
* @param <I> Type of the incoming payload * @param <I> Type of the incoming payload
* @param <O> Type of the outgoing payload * @param <O> Type of the outgoing payload
......
...@@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.ApplyFunction; ...@@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
...@@ -47,27 +49,27 @@ import java.util.concurrent.atomic.AtomicReference; ...@@ -47,27 +49,27 @@ import java.util.concurrent.atomic.AtomicReference;
@ThreadSafe @ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
/** Heartbeat timeout interval in milli seconds */ /** Heartbeat timeout interval in milli seconds. */
private final long heartbeatTimeoutIntervalMs; private final long heartbeatTimeoutIntervalMs;
/** Resource ID which is used to mark one own's heartbeat signals */ /** Resource ID which is used to mark one own's heartbeat signals. */
private final ResourceID ownResourceID; private final ResourceID ownResourceID;
/** Heartbeat listener with which the heartbeat manager has been associated */ /** Heartbeat listener with which the heartbeat manager has been associated. */
private final HeartbeatListener<I, O> heartbeatListener; private final HeartbeatListener<I, O> heartbeatListener;
/** Executor service used to run heartbeat timeout notifications */ /** Executor service used to run heartbeat timeout notifications. */
private final ScheduledExecutor scheduledExecutor; private final ScheduledExecutor scheduledExecutor;
protected final Logger log; protected final Logger log;
/** Map containing the heartbeat monitors associated with the respective resource ID */ /** Map containing the heartbeat monitors associated with the respective resource ID. */
private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets; private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;
/** Execution context used to run future callbacks */ /** Execution context used to run future callbacks. */
private final Executor executor; private final Executor executor;
/** Running state of the heartbeat manager */ /** Running state of the heartbeat manager. */
protected volatile boolean stopped; protected volatile boolean stopped;
public HeartbeatManagerImpl( public HeartbeatManagerImpl(
...@@ -240,18 +242,18 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { ...@@ -240,18 +242,18 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
*/ */
static class HeartbeatMonitor<O> implements Runnable { static class HeartbeatMonitor<O> implements Runnable {
/** Resource ID of the monitored heartbeat target */ /** Resource ID of the monitored heartbeat target. */
private final ResourceID resourceID; private final ResourceID resourceID;
/** Associated heartbeat target */ /** Associated heartbeat target. */
private final HeartbeatTarget<O> heartbeatTarget; private final HeartbeatTarget<O> heartbeatTarget;
private final ScheduledExecutor scheduledExecutor; private final ScheduledExecutor scheduledExecutor;
/** Listener which is notified about heartbeat timeouts */ /** Listener which is notified about heartbeat timeouts. */
private final HeartbeatListener<?, ?> heartbeatListener; private final HeartbeatListener<?, ?> heartbeatListener;
/** Maximum heartbeat timeout interval */ /** Maximum heartbeat timeout interval. */
private final long heartbeatTimeoutIntervalMs; private final long heartbeatTimeoutIntervalMs;
private volatile ScheduledFuture<?> futureTimeout; private volatile ScheduledFuture<?> futureTimeout;
......
...@@ -23,6 +23,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction; ...@@ -23,6 +23,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
......
...@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; ...@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
/** /**
...@@ -31,10 +32,10 @@ import org.slf4j.Logger; ...@@ -31,10 +32,10 @@ import org.slf4j.Logger;
*/ */
public class HeartbeatServices { public class HeartbeatServices {
/** Heartbeat interval for the created services */ /** Heartbeat interval for the created services. */
protected final long heartbeatInterval; protected final long heartbeatInterval;
/** Heartbeat timeout for the created services */ /** Heartbeat timeout for the created services. */
protected final long heartbeatTimeout; protected final long heartbeatTimeout;
public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
......
...@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; ...@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -46,12 +47,14 @@ import static org.mockito.Matchers.anyLong; ...@@ -46,12 +47,14 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/**
* Tests for the {@link HeartbeatManager}.
*/
public class HeartbeatManagerTest extends TestLogger { public class HeartbeatManagerTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class); private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
...@@ -217,7 +220,7 @@ public class HeartbeatManagerTest extends TestLogger { ...@@ -217,7 +220,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatListener2, heartbeatListener2,
new DirectExecutorService(), new DirectExecutorService(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);; LOG);
heartbeatManager.monitorTarget(resourceID2, heartbeatManager2); heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
heartbeatManager2.monitorTarget(resourceID, heartbeatManager); heartbeatManager2.monitorTarget(resourceID, heartbeatManager);
...@@ -239,7 +242,7 @@ public class HeartbeatManagerTest extends TestLogger { ...@@ -239,7 +242,7 @@ public class HeartbeatManagerTest extends TestLogger {
} }
/** /**
* Tests that after unmonitoring a target, there won't be a timeout triggered * Tests that after unmonitoring a target, there won't be a timeout triggered.
*/ */
@Test @Test
public void testTargetUnmonitoring() throws InterruptedException, ExecutionException { public void testTargetUnmonitoring() throws InterruptedException, ExecutionException {
...@@ -265,7 +268,6 @@ public class HeartbeatManagerTest extends TestLogger { ...@@ -265,7 +268,6 @@ public class HeartbeatManagerTest extends TestLogger {
Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture(); Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture();
try { try {
timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
fail("Timeout should time out."); fail("Timeout should time out.");
......
...@@ -21,8 +21,12 @@ package org.apache.flink.runtime.heartbeat; ...@@ -21,8 +21,12 @@ package org.apache.flink.runtime.heartbeat;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
/**
* A {@link HeartbeatServices} that allows the injection of a {@link ScheduledExecutor}.
*/
public class TestingHeartbeatServices extends HeartbeatServices { public class TestingHeartbeatServices extends HeartbeatServices {
private final ScheduledExecutor scheduledExecutorToUse; private final ScheduledExecutor scheduledExecutorToUse;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册