[FLINK-6635] [test] Fix ClientConnectionTest

The ClientConnectionTest passed even though it was failing the test because we
were expecting an exception and checking a special word to contained in the
exception's message. Unfortunately, we generated an AssertionError with the same
word if the actual logic we wanted to test failed. That cause the test to pass.
上级 392bc713
...@@ -48,6 +48,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; ...@@ -48,6 +48,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
...@@ -56,6 +57,7 @@ import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; ...@@ -56,6 +57,7 @@ import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -733,10 +735,16 @@ public abstract class ClusterClient { ...@@ -733,10 +735,16 @@ public abstract class ClusterClient {
*/ */
public ActorGateway getJobManagerGateway() throws Exception { public ActorGateway getJobManagerGateway() throws Exception {
LOG.debug("Looking up JobManager"); LOG.debug("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), try {
actorSystemLoader.get(), return LeaderRetrievalUtils.retrieveLeaderGateway(
lookupTimeout); highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
actorSystemLoader.get(),
lookupTimeout);
} catch (LeaderRetrievalException lre) {
throw new FlinkException("Could not connect to the leading JobManager. Please check that the " +
"JobManager is running.", lre);
}
} }
/** /**
......
...@@ -52,7 +52,7 @@ public class StandaloneClusterClient extends ClusterClient { ...@@ -52,7 +52,7 @@ public class StandaloneClusterClient extends ClusterClient {
@Override @Override
public String getWebInterfaceURL() { public String getWebInterfaceURL() {
String host = this.getJobManagerAddress().getHostString(); String host = getJobManagerAddress().getHostString();
int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT); int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
return "http://" + host + ":" + port; return "http://" + host + ":" + port;
} }
...@@ -70,7 +70,7 @@ public class StandaloneClusterClient extends ClusterClient { ...@@ -70,7 +70,7 @@ public class StandaloneClusterClient extends ClusterClient {
throw new RuntimeException("Received the wrong reply " + result + " from cluster."); throw new RuntimeException("Received the wrong reply " + result + " from cluster.");
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Couldn't retrieve the Cluster status.", e); throw new RuntimeException("Couldn't retrieve the cluster status.", e);
} }
} }
......
...@@ -20,14 +20,14 @@ package org.apache.flink.client.program; ...@@ -20,14 +20,14 @@ package org.apache.flink.client.program;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.util.NetUtils; import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test; import org.junit.Test;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*; import static org.junit.Assert.*;
...@@ -35,17 +35,16 @@ import static org.junit.Assert.*; ...@@ -35,17 +35,16 @@ import static org.junit.Assert.*;
* This test starts a job client without the JobManager being reachable. It * This test starts a job client without the JobManager being reachable. It
* tests for a timely error and a meaningful error message. * tests for a timely error and a meaningful error message.
*/ */
public class ClientConnectionTest { public class ClientConnectionTest extends TestLogger {
private static final long CONNECT_TIMEOUT = 2 * 1000; // 2 seconds private static final long CONNECT_TIMEOUT = 100L; // 100 ms
private static final long ASK_STARTUP_TIMEOUT = 100 * 1000; // 100 seconds private static final long ASK_STARTUP_TIMEOUT = 20000L; // 10 seconds
private static final long MAX_DELAY = 50 * 1000; // less than the startup timeout
/** /**
* Tests the behavior against a LOCAL address where no job manager is running. * Tests the behavior against a LOCAL address where no job manager is running.
*/ */
@Test @Test
public void testExceptionWhenLocalJobManagerUnreachablelocal() { public void testExceptionWhenLocalJobManagerUnreachablelocal() throws Exception {
final InetSocketAddress unreachableEndpoint; final InetSocketAddress unreachableEndpoint;
try { try {
...@@ -64,7 +63,7 @@ public class ClientConnectionTest { ...@@ -64,7 +63,7 @@ public class ClientConnectionTest {
* Tests the behavior against a REMOTE address where no job manager is running. * Tests the behavior against a REMOTE address where no job manager is running.
*/ */
@Test @Test
public void testExceptionWhenRemoteJobManagerUnreachable() { public void testExceptionWhenRemoteJobManagerUnreachable() throws Exception {
final InetSocketAddress unreachableEndpoint; final InetSocketAddress unreachableEndpoint;
try { try {
...@@ -79,78 +78,24 @@ public class ClientConnectionTest { ...@@ -79,78 +78,24 @@ public class ClientConnectionTest {
testFailureBehavior(unreachableEndpoint); testFailureBehavior(unreachableEndpoint);
} }
private void testFailureBehavior(final InetSocketAddress unreachableEndpoint) { private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception {
final Configuration config = new Configuration(); final Configuration config = new Configuration();
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s"); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms");
config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s"); config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName()); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
ClusterClient client = new StandaloneClusterClient(config);
try { try {
JobVertex vertex = new JobVertex("Test Vertex"); // we have to query the cluster status to start the connection attempts
vertex.setInvokableClass(TestInvokable.class); client.getClusterStatus();
fail("This should fail with an exception since the endpoint is unreachable.");
final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); } catch (Exception e) {
// check that we have failed with a LeaderRetrievalException which says that we could
Thread invoker = new Thread("test invoker") { // not connect to the leading JobManager
@Override assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class));
public void run() {
try {
new StandaloneClusterClient(config);
fail("This should fail with an exception since the JobManager is unreachable.");
}
catch (Throwable t) {
synchronized (error) {
error.set(t);
error.notifyAll();
}
}
}
};
invoker.setDaemon(true);
invoker.start();
try {
// wait until the caller is successful, for at most the given time
long now = System.nanoTime();
long deadline = now + MAX_DELAY * 1_000_000;
synchronized (error) {
while (invoker.isAlive() && error.get() == null && now < deadline) {
error.wait(1000);
now = System.nanoTime();
}
}
Throwable t = error.get();
if (t == null) {
fail("Job invocation did not fail in expected time interval.");
}
else {
assertNotNull(t.getMessage());
assertTrue(t.getMessage(), t.getMessage().contains("JobManager"));
}
}
finally {
if (invoker.isAlive()) {
invoker.interrupt();
}
}
} }
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
public static class TestInvokable extends AbstractInvokable {
@Override
public void invoke() {}
} }
} }
...@@ -76,7 +76,7 @@ public class LeaderRetrievalUtils { ...@@ -76,7 +76,7 @@ public class LeaderRetrievalUtils {
return Await.result(actorGatewayFuture, timeout); return Await.result(actorGatewayFuture, timeout);
} catch (Exception e) { } catch (Exception e) {
throw new LeaderRetrievalException("Could not retrieve the leader gateway", e); throw new LeaderRetrievalException("Could not retrieve the leader gateway.", e);
} finally { } finally {
try { try {
leaderRetrievalService.stop(); leaderRetrievalService.stop();
......
...@@ -302,4 +302,26 @@ public class CommonTestUtils { ...@@ -302,4 +302,26 @@ public class CommonTestUtils {
throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t); throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
} }
} }
/**
* Checks whether the given throwable contains the given cause as a cause. The cause is not checked
* on equality but on type equality.
*
* @param throwable Throwable to check for the cause
* @param cause Cause to look for
* @return True if the given Throwable contains the given cause (type equality); otherwise false
*/
public static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause) {
Throwable current = throwable;
while (current != null) {
if (cause.isAssignableFrom(current.getClass())) {
return true;
}
current = current.getCause();
}
return false;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册