diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index b0817217a99025ba58b32991843a61cad3bcd1dc..e09a0b6b476d5e96d55878593831cd9fcd648b41 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; @@ -56,6 +57,7 @@ import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; @@ -733,10 +735,16 @@ public abstract class ClusterClient { */ public ActorGateway getJobManagerGateway() throws Exception { LOG.debug("Looking up JobManager"); - return LeaderRetrievalUtils.retrieveLeaderGateway( - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - actorSystemLoader.get(), - lookupTimeout); + + try { + return LeaderRetrievalUtils.retrieveLeaderGateway( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + actorSystemLoader.get(), + lookupTimeout); + } catch (LeaderRetrievalException lre) { + throw new FlinkException("Could not connect to the leading JobManager. Please check that the " + + "JobManager is running.", lre); + } } /** diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 7517504ba08efb3eb375420eaa745a4df54c1849..b00e519460383fe2d9918b88bff8c5225d7540de 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -52,7 +52,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { - String host = this.getJobManagerAddress().getHostString(); + String host = getJobManagerAddress().getHostString(); int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT); return "http://" + host + ":" + port; } @@ -70,7 +70,7 @@ public class StandaloneClusterClient extends ClusterClient { throw new RuntimeException("Received the wrong reply " + result + " from cluster."); } } catch (Exception e) { - throw new RuntimeException("Couldn't retrieve the Cluster status.", e); + throw new RuntimeException("Couldn't retrieve the cluster status.", e); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index fc24a9d10ae0f4b9dac2332862b8f90ba24cbff5..3bfaa952de84a9be69e0f77d12b6ac91a60739e7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -20,14 +20,14 @@ package org.apache.flink.client.program; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -35,17 +35,16 @@ import static org.junit.Assert.*; * This test starts a job client without the JobManager being reachable. It * tests for a timely error and a meaningful error message. */ -public class ClientConnectionTest { +public class ClientConnectionTest extends TestLogger { - private static final long CONNECT_TIMEOUT = 2 * 1000; // 2 seconds - private static final long ASK_STARTUP_TIMEOUT = 100 * 1000; // 100 seconds - private static final long MAX_DELAY = 50 * 1000; // less than the startup timeout + private static final long CONNECT_TIMEOUT = 100L; // 100 ms + private static final long ASK_STARTUP_TIMEOUT = 20000L; // 10 seconds /** * Tests the behavior against a LOCAL address where no job manager is running. */ @Test - public void testExceptionWhenLocalJobManagerUnreachablelocal() { + public void testExceptionWhenLocalJobManagerUnreachablelocal() throws Exception { final InetSocketAddress unreachableEndpoint; try { @@ -64,7 +63,7 @@ public class ClientConnectionTest { * Tests the behavior against a REMOTE address where no job manager is running. */ @Test - public void testExceptionWhenRemoteJobManagerUnreachable() { + public void testExceptionWhenRemoteJobManagerUnreachable() throws Exception { final InetSocketAddress unreachableEndpoint; try { @@ -79,78 +78,24 @@ public class ClientConnectionTest { testFailureBehavior(unreachableEndpoint); } - private void testFailureBehavior(final InetSocketAddress unreachableEndpoint) { + private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception { final Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s"); - config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s"); + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms"); + config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms"); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort()); + ClusterClient client = new StandaloneClusterClient(config); try { - JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setInvokableClass(TestInvokable.class); - - final AtomicReference error = new AtomicReference(); - - Thread invoker = new Thread("test invoker") { - @Override - public void run() { - try { - new StandaloneClusterClient(config); - fail("This should fail with an exception since the JobManager is unreachable."); - } - catch (Throwable t) { - synchronized (error) { - error.set(t); - error.notifyAll(); - } - } - } - }; - - invoker.setDaemon(true); - invoker.start(); - - try { - // wait until the caller is successful, for at most the given time - long now = System.nanoTime(); - long deadline = now + MAX_DELAY * 1_000_000; - - synchronized (error) { - while (invoker.isAlive() && error.get() == null && now < deadline) { - error.wait(1000); - now = System.nanoTime(); - } - } - - Throwable t = error.get(); - if (t == null) { - fail("Job invocation did not fail in expected time interval."); - } - else { - assertNotNull(t.getMessage()); - assertTrue(t.getMessage(), t.getMessage().contains("JobManager")); - } - } - finally { - if (invoker.isAlive()) { - invoker.interrupt(); - } - } + // we have to query the cluster status to start the connection attempts + client.getClusterStatus(); + fail("This should fail with an exception since the endpoint is unreachable."); + } catch (Exception e) { + // check that we have failed with a LeaderRetrievalException which says that we could + // not connect to the leading JobManager + assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class)); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - public static class TestInvokable extends AbstractInvokable { - - @Override - public void invoke() {} } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java index 073c52b17e0c3c297b9a1e489da4fd77741831a6..009bec62b2f3b983592d023f14c2821a039d1ca9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java @@ -76,7 +76,7 @@ public class LeaderRetrievalUtils { return Await.result(actorGatewayFuture, timeout); } catch (Exception e) { - throw new LeaderRetrievalException("Could not retrieve the leader gateway", e); + throw new LeaderRetrievalException("Could not retrieve the leader gateway.", e); } finally { try { leaderRetrievalService.stop(); diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index cf2bb7f20b9a1fdd033ab5249d7ccee0e6d46bd9..33811f2a3d4d14ad437b10ea65fa08d8b442a9f4 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -302,4 +302,26 @@ public class CommonTestUtils { throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t); } } + + /** + * Checks whether the given throwable contains the given cause as a cause. The cause is not checked + * on equality but on type equality. + * + * @param throwable Throwable to check for the cause + * @param cause Cause to look for + * @return True if the given Throwable contains the given cause (type equality); otherwise false + */ + public static boolean containsCause(Throwable throwable, Class cause) { + Throwable current = throwable; + + while (current != null) { + if (cause.isAssignableFrom(current.getClass())) { + return true; + } + + current = current.getCause(); + } + + return false; + } }