[FLINK-7651] [flip-6] Delay RetryingRegistration in case of connection error

Similar to a registration error we should also delay the retrying registration in case of
connection error which could happen if the remote endpoint has not been started yet.

This closes #4686.
上级 c6243b8b
......@@ -151,6 +151,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
* Cancels the registration procedure.
*/
public void cancel() {
completionFuture.cancel(false);
canceled = true;
}
......@@ -175,6 +176,11 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
*/
@SuppressWarnings("unchecked")
public void startRegistration() {
if (canceled) {
// we already got canceled
return;
}
try {
// trigger resolution of the resource manager address to a callable gateway
final CompletableFuture<G> resourceManagerFuture;
......@@ -199,16 +205,17 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
// upon failure, retry, unless this is cancelled
resourceManagerAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !isCanceled()) {
log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure);
startRegistration();
if (failure != null && !canceled) {
log.warn("Could not resolve {} address {}, retrying in {} ms", targetName, targetAddress, delayOnError, failure);
startRegistrationLater(delayOnError);
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
cancel();
completionFuture.completeExceptionally(t);
cancel();
}
}
......@@ -280,8 +287,8 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
rpcService.getExecutor());
}
catch (Throwable t) {
cancel();
completionFuture.completeExceptionally(t);
cancel();
}
}
......@@ -293,4 +300,11 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
}
}, delay, TimeUnit.MILLISECONDS);
}
private void startRegistrationLater(final long delay) {
rpcService.scheduleRunnable(
this::startRegistration,
delay,
TimeUnit.MILLISECONDS);
}
}
......@@ -22,16 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -115,7 +116,7 @@ public class RetryingRegistrationTest extends TestLogger {
final String testId = "laissez les bon temps roulez";
final UUID leaderId = UUID.randomUUID();
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = TestingUtils.defaultExecutor();
TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
try {
......@@ -126,20 +127,36 @@ public class RetryingRegistrationTest extends TestLogger {
CompletableFuture.completedFuture(testGateway) // second connection attempt succeeds
);
when(rpc.getExecutor()).thenReturn(executor);
when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenAnswer(
(InvocationOnMock invocation) -> {
final Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
final long delay = invocation.getArgumentAt(1, Long.class);
final TimeUnit timeUnit = invocation.getArgumentAt(2, TimeUnit.class);
return TestingUtils.defaultScheduledExecutor().schedule(runnable, delay, timeUnit);
});
TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
long start = System.currentTimeMillis();
registration.startRegistration();
Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
registration.getFuture().get(10L, TimeUnit.SECONDS);
// measure the duration of the registration --> should be longer than the error delay
long duration = System.currentTimeMillis() - start;
assertTrue(
"The registration should have failed the first time. Thus the duration should be longer than at least a single error delay.",
duration > TestRetryingRegistration.DELAY_ON_ERROR);
// validate correct invocation and result
assertEquals(testId, success.f1.getCorrelationId());
assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
}
finally {
testGateway.stop();
executor.shutdown();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册